RequestChannel.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Runtime;
  9. using System.ServiceModel;
  10. using System.ServiceModel.Diagnostics;
  11. using System.Threading;
  12. abstract class RequestChannel : ChannelBase, IRequestChannel
  13. {
  14. bool manualAddressing;
  15. List<IRequestBase> outstandingRequests = new List<IRequestBase>();
  16. EndpointAddress to;
  17. Uri via;
  18. ManualResetEvent closedEvent;
  19. bool closed;
  20. protected RequestChannel(ChannelManagerBase channelFactory, EndpointAddress to, Uri via, bool manualAddressing)
  21. : base(channelFactory)
  22. {
  23. if (!manualAddressing)
  24. {
  25. if (to == null)
  26. {
  27. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("to");
  28. }
  29. }
  30. this.manualAddressing = manualAddressing;
  31. this.to = to;
  32. this.via = via;
  33. }
  34. protected bool ManualAddressing
  35. {
  36. get
  37. {
  38. return this.manualAddressing;
  39. }
  40. }
  41. public EndpointAddress RemoteAddress
  42. {
  43. get
  44. {
  45. return this.to;
  46. }
  47. }
  48. public Uri Via
  49. {
  50. get
  51. {
  52. return this.via;
  53. }
  54. }
  55. protected void AbortPendingRequests()
  56. {
  57. IRequestBase[] requestsToAbort = CopyPendingRequests(false);
  58. if (requestsToAbort != null)
  59. {
  60. foreach (IRequestBase request in requestsToAbort)
  61. {
  62. request.Abort(this);
  63. }
  64. }
  65. }
  66. protected IAsyncResult BeginWaitForPendingRequests(TimeSpan timeout, AsyncCallback callback, object state)
  67. {
  68. IRequestBase[] pendingRequests = SetupWaitForPendingRequests();
  69. return new WaitForPendingRequestsAsyncResult(timeout, this, pendingRequests, callback, state);
  70. }
  71. protected void EndWaitForPendingRequests(IAsyncResult result)
  72. {
  73. WaitForPendingRequestsAsyncResult.End(result);
  74. }
  75. void FinishClose()
  76. {
  77. lock (outstandingRequests)
  78. {
  79. if (!closed)
  80. {
  81. closed = true;
  82. if (closedEvent != null)
  83. {
  84. this.closedEvent.Close();
  85. }
  86. }
  87. }
  88. }
  89. IRequestBase[] SetupWaitForPendingRequests()
  90. {
  91. return this.CopyPendingRequests(true);
  92. }
  93. protected void WaitForPendingRequests(TimeSpan timeout)
  94. {
  95. IRequestBase[] pendingRequests = SetupWaitForPendingRequests();
  96. if (pendingRequests != null)
  97. {
  98. if (!closedEvent.WaitOne(timeout, false))
  99. {
  100. foreach (IRequestBase request in pendingRequests)
  101. {
  102. request.Abort(this);
  103. }
  104. }
  105. }
  106. FinishClose();
  107. }
  108. IRequestBase[] CopyPendingRequests(bool createEventIfNecessary)
  109. {
  110. IRequestBase[] requests = null;
  111. lock (outstandingRequests)
  112. {
  113. if (outstandingRequests.Count > 0)
  114. {
  115. requests = new IRequestBase[outstandingRequests.Count];
  116. outstandingRequests.CopyTo(requests);
  117. outstandingRequests.Clear();
  118. if (createEventIfNecessary && closedEvent == null)
  119. {
  120. closedEvent = new ManualResetEvent(false);
  121. }
  122. }
  123. }
  124. return requests;
  125. }
  126. protected void FaultPendingRequests()
  127. {
  128. IRequestBase[] requestsToFault = CopyPendingRequests(false);
  129. if (requestsToFault != null)
  130. {
  131. foreach (IRequestBase request in requestsToFault)
  132. {
  133. request.Fault(this);
  134. }
  135. }
  136. }
  137. public override T GetProperty<T>()
  138. {
  139. if (typeof(T) == typeof(IRequestChannel))
  140. {
  141. return (T)(object)this;
  142. }
  143. T baseProperty = base.GetProperty<T>();
  144. if (baseProperty != null)
  145. {
  146. return baseProperty;
  147. }
  148. return default(T);
  149. }
  150. protected override void OnAbort()
  151. {
  152. AbortPendingRequests();
  153. }
  154. void ReleaseRequest(IRequestBase request)
  155. {
  156. if (request != null)
  157. {
  158. // Synchronization of OnReleaseRequest is the
  159. // responsibility of the concrete implementation of request.
  160. request.OnReleaseRequest();
  161. }
  162. lock (outstandingRequests)
  163. {
  164. // Remove supports the connection having been removed, so don't need extra Contains() check,
  165. // even though this may have been removed by Abort()
  166. outstandingRequests.Remove(request);
  167. if (outstandingRequests.Count == 0)
  168. {
  169. if (!closed && closedEvent != null)
  170. {
  171. closedEvent.Set();
  172. }
  173. }
  174. }
  175. }
  176. void TrackRequest(IRequestBase request)
  177. {
  178. lock (outstandingRequests)
  179. {
  180. ThrowIfDisposedOrNotOpen(); // make sure that we haven't already snapshot our collection
  181. outstandingRequests.Add(request);
  182. }
  183. }
  184. public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
  185. {
  186. return this.BeginRequest(message, this.DefaultSendTimeout, callback, state);
  187. }
  188. public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  189. {
  190. if (message == null)
  191. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("message");
  192. if (timeout < TimeSpan.Zero)
  193. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  194. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  195. ThrowIfDisposedOrNotOpen();
  196. AddHeadersTo(message);
  197. IAsyncRequest asyncRequest = CreateAsyncRequest(message, callback, state);
  198. TrackRequest(asyncRequest);
  199. bool throwing = true;
  200. try
  201. {
  202. asyncRequest.BeginSendRequest(message, timeout);
  203. throwing = false;
  204. }
  205. finally
  206. {
  207. if (throwing)
  208. {
  209. ReleaseRequest(asyncRequest);
  210. }
  211. }
  212. return asyncRequest;
  213. }
  214. protected abstract IRequest CreateRequest(Message message);
  215. protected abstract IAsyncRequest CreateAsyncRequest(Message message, AsyncCallback callback, object state);
  216. public Message EndRequest(IAsyncResult result)
  217. {
  218. if (result == null)
  219. {
  220. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
  221. }
  222. IAsyncRequest asyncRequest = result as IAsyncRequest;
  223. if (asyncRequest == null)
  224. {
  225. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("result", SR.GetString(SR.InvalidAsyncResult));
  226. }
  227. try
  228. {
  229. Message reply = asyncRequest.End();
  230. if (DiagnosticUtility.ShouldTraceInformation)
  231. {
  232. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.RequestChannelReplyReceived,
  233. SR.GetString(SR.TraceCodeRequestChannelReplyReceived), reply);
  234. }
  235. return reply;
  236. }
  237. finally
  238. {
  239. ReleaseRequest(asyncRequest);
  240. }
  241. }
  242. public Message Request(Message message)
  243. {
  244. return this.Request(message, this.DefaultSendTimeout);
  245. }
  246. public Message Request(Message message, TimeSpan timeout)
  247. {
  248. if (message == null)
  249. {
  250. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("message");
  251. }
  252. if (timeout < TimeSpan.Zero)
  253. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  254. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  255. ThrowIfDisposedOrNotOpen();
  256. AddHeadersTo(message);
  257. IRequest request = CreateRequest(message);
  258. TrackRequest(request);
  259. try
  260. {
  261. Message reply;
  262. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  263. TimeSpan savedTimeout = timeoutHelper.RemainingTime();
  264. try
  265. {
  266. request.SendRequest(message, savedTimeout);
  267. }
  268. catch (TimeoutException timeoutException)
  269. {
  270. throw TraceUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.RequestChannelSendTimedOut, savedTimeout),
  271. timeoutException), message);
  272. }
  273. savedTimeout = timeoutHelper.RemainingTime();
  274. try
  275. {
  276. reply = request.WaitForReply(savedTimeout);
  277. }
  278. catch (TimeoutException timeoutException)
  279. {
  280. throw TraceUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.RequestChannelWaitForReplyTimedOut, savedTimeout),
  281. timeoutException), message);
  282. }
  283. if (DiagnosticUtility.ShouldTraceInformation)
  284. {
  285. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.RequestChannelReplyReceived,
  286. SR.GetString(SR.TraceCodeRequestChannelReplyReceived), reply);
  287. }
  288. return reply;
  289. }
  290. finally
  291. {
  292. ReleaseRequest(request);
  293. }
  294. }
  295. protected virtual void AddHeadersTo(Message message)
  296. {
  297. if (!manualAddressing && to != null)
  298. {
  299. to.ApplyTo(message);
  300. }
  301. }
  302. class WaitForPendingRequestsAsyncResult : AsyncResult
  303. {
  304. static WaitOrTimerCallback completeWaitCallBack = new WaitOrTimerCallback(OnCompleteWaitCallBack);
  305. IRequestBase[] pendingRequests;
  306. RequestChannel requestChannel;
  307. TimeSpan timeout;
  308. RegisteredWaitHandle waitHandle;
  309. public WaitForPendingRequestsAsyncResult(TimeSpan timeout, RequestChannel requestChannel, IRequestBase[] pendingRequests, AsyncCallback callback, object state)
  310. : base(callback, state)
  311. {
  312. this.requestChannel = requestChannel;
  313. this.pendingRequests = pendingRequests;
  314. this.timeout = timeout;
  315. if (this.timeout == TimeSpan.Zero || this.pendingRequests == null)
  316. {
  317. AbortRequests();
  318. CleanupEvents();
  319. Complete(true);
  320. }
  321. else
  322. {
  323. this.waitHandle = ThreadPool.RegisterWaitForSingleObject(this.requestChannel.closedEvent, completeWaitCallBack, this, TimeoutHelper.ToMilliseconds(timeout), true);
  324. }
  325. }
  326. void AbortRequests()
  327. {
  328. if (pendingRequests != null)
  329. {
  330. foreach (IRequestBase request in pendingRequests)
  331. {
  332. request.Abort(this.requestChannel);
  333. }
  334. }
  335. }
  336. void CleanupEvents()
  337. {
  338. if (requestChannel.closedEvent != null)
  339. {
  340. if (waitHandle != null)
  341. {
  342. waitHandle.Unregister(requestChannel.closedEvent);
  343. }
  344. requestChannel.FinishClose();
  345. }
  346. }
  347. static void OnCompleteWaitCallBack(object state, bool timedOut)
  348. {
  349. WaitForPendingRequestsAsyncResult thisPtr = (WaitForPendingRequestsAsyncResult)state;
  350. Exception completionException = null;
  351. try
  352. {
  353. if (timedOut)
  354. {
  355. thisPtr.AbortRequests();
  356. }
  357. thisPtr.CleanupEvents();
  358. }
  359. #pragma warning suppress 56500 // [....], transferring exception to another thread
  360. catch (Exception e)
  361. {
  362. if (Fx.IsFatal(e))
  363. {
  364. throw;
  365. }
  366. completionException = e;
  367. }
  368. thisPtr.Complete(false, completionException);
  369. }
  370. public static void End(IAsyncResult result)
  371. {
  372. AsyncResult.End<WaitForPendingRequestsAsyncResult>(result);
  373. }
  374. }
  375. }
  376. interface IRequestBase
  377. {
  378. void Abort(RequestChannel requestChannel);
  379. void Fault(RequestChannel requestChannel);
  380. void OnReleaseRequest();
  381. }
  382. interface IRequest : IRequestBase
  383. {
  384. void SendRequest(Message message, TimeSpan timeout);
  385. Message WaitForReply(TimeSpan timeout);
  386. }
  387. interface IAsyncRequest : IAsyncResult, IRequestBase
  388. {
  389. void BeginSendRequest(Message message, TimeSpan timeout);
  390. Message End();
  391. }
  392. }