ServicePointScheduler.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. //
  2. // ServicePointScheduler.cs
  3. //
  4. // Author:
  5. // Martin Baulig <[email protected]>
  6. //
  7. // Copyright (c) 2017 Xamarin Inc. (http://www.xamarin.com)
  8. //
  9. // Permission is hereby granted, free of charge, to any person obtaining a copy
  10. // of this software and associated documentation files (the "Software"), to deal
  11. // in the Software without restriction, including without limitation the rights
  12. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  13. // copies of the Software, and to permit persons to whom the Software is
  14. // furnished to do so, subject to the following conditions:
  15. //
  16. // The above copyright notice and this permission notice shall be included in
  17. // all copies or substantial portions of the Software.
  18. //
  19. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  20. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  21. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  22. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  23. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  24. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  25. // THE SOFTWARE.
  26. using System.IO;
  27. using System.Collections;
  28. using System.Collections.Generic;
  29. using System.Net.Sockets;
  30. using System.Threading;
  31. using System.Threading.Tasks;
  32. using System.Runtime.ExceptionServices;
  33. using System.Diagnostics;
  34. namespace System.Net
  35. {
  36. class ServicePointScheduler
  37. {
  38. public ServicePoint ServicePoint {
  39. get;
  40. }
  41. public int MaxIdleTime {
  42. get { return maxIdleTime; }
  43. set {
  44. if (value < Timeout.Infinite || value > Int32.MaxValue)
  45. throw new ArgumentOutOfRangeException ();
  46. if (value == maxIdleTime)
  47. return;
  48. maxIdleTime = value;
  49. Debug ($"MAX IDLE TIME = {value}");
  50. Run ();
  51. }
  52. }
  53. public int ConnectionLimit {
  54. get { return connectionLimit; }
  55. set {
  56. if (value <= 0)
  57. throw new ArgumentOutOfRangeException ();
  58. if (value == connectionLimit)
  59. return;
  60. connectionLimit = value;
  61. Debug ($"CONNECTION LIMIT = {value}");
  62. Run ();
  63. }
  64. }
  65. public ServicePointScheduler (ServicePoint servicePoint, int connectionLimit, int maxIdleTime)
  66. {
  67. ServicePoint = servicePoint;
  68. this.connectionLimit = connectionLimit;
  69. this.maxIdleTime = maxIdleTime;
  70. schedulerEvent = new AsyncManualResetEvent (false);
  71. defaultGroup = new ConnectionGroup (this, string.Empty);
  72. operations = new LinkedList<(ConnectionGroup, WebOperation)> ();
  73. idleConnections = new LinkedList<(ConnectionGroup, WebConnection, Task)> ();
  74. idleSince = DateTime.UtcNow;
  75. }
  76. [Conditional ("MONO_WEB_DEBUG")]
  77. void Debug (string message, params object[] args)
  78. {
  79. WebConnection.Debug ($"SPS({ID}): {string.Format (message, args)}");
  80. }
  81. [Conditional ("MONO_WEB_DEBUG")]
  82. void Debug (string message)
  83. {
  84. WebConnection.Debug ($"SPS({ID}): {message}");
  85. }
  86. int running;
  87. int maxIdleTime = 100000;
  88. AsyncManualResetEvent schedulerEvent;
  89. ConnectionGroup defaultGroup;
  90. Dictionary<string, ConnectionGroup> groups;
  91. LinkedList<(ConnectionGroup, WebOperation)> operations;
  92. LinkedList<(ConnectionGroup, WebConnection, Task)> idleConnections;
  93. int currentConnections;
  94. int connectionLimit;
  95. DateTime idleSince;
  96. public int CurrentConnections {
  97. get {
  98. return currentConnections;
  99. }
  100. }
  101. public DateTime IdleSince {
  102. get {
  103. return idleSince;
  104. }
  105. }
  106. static int nextId;
  107. public readonly int ID = ++nextId;
  108. internal string ME {
  109. get;
  110. }
  111. public void Run ()
  112. {
  113. lock (ServicePoint) {
  114. if (Interlocked.CompareExchange (ref running, 1, 0) == 0)
  115. StartScheduler ();
  116. schedulerEvent.Set ();
  117. }
  118. }
  119. async void StartScheduler ()
  120. {
  121. idleSince = DateTime.UtcNow + TimeSpan.FromDays (3650);
  122. while (true) {
  123. Debug ($"MAIN LOOP");
  124. // Gather list of currently running operations.
  125. ValueTuple<ConnectionGroup, WebOperation>[] operationArray;
  126. ValueTuple<ConnectionGroup, WebConnection, Task>[] idleArray;
  127. var taskList = new List<Task> ();
  128. lock (ServicePoint) {
  129. Cleanup ();
  130. if (groups == null && defaultGroup.IsEmpty () && operations.Count == 0 && idleConnections.Count == 0) {
  131. Debug ($"MAIN LOOP DONE");
  132. running = 0;
  133. idleSince = DateTime.UtcNow;
  134. schedulerEvent.Reset ();
  135. return;
  136. }
  137. operationArray = new ValueTuple<ConnectionGroup, WebOperation>[operations.Count];
  138. operations.CopyTo (operationArray, 0);
  139. idleArray = new ValueTuple<ConnectionGroup, WebConnection, Task>[idleConnections.Count];
  140. idleConnections.CopyTo (idleArray, 0);
  141. taskList.Add (schedulerEvent.WaitAsync (maxIdleTime));
  142. foreach (var item in operationArray)
  143. taskList.Add (item.Item2.WaitForCompletion (true));
  144. foreach (var item in idleArray)
  145. taskList.Add (item.Item3);
  146. }
  147. Debug ($"MAIN LOOP #1: operations={operationArray.Length} idle={idleArray.Length}");
  148. var ret = await Task.WhenAny (taskList).ConfigureAwait (false);
  149. lock (ServicePoint) {
  150. if (ret == taskList[0]) {
  151. RunSchedulerIteration ();
  152. continue;
  153. }
  154. int idx = -1;
  155. for (int i = 0; i < operationArray.Length; i++) {
  156. if (ret == taskList[i + 1]) {
  157. idx = i;
  158. break;
  159. }
  160. }
  161. if (idx >= 0) {
  162. var item = operationArray[idx];
  163. Debug ($"MAIN LOOP #2: {idx} group={item.Item1.ID} Op={item.Item2.ID}");
  164. operations.Remove (item);
  165. var opTask = (Task<ValueTuple<bool, WebOperation>>)ret;
  166. var runLoop = OperationCompleted (item.Item1, item.Item2, opTask);
  167. Debug ($"MAIN LOOP #2 DONE: {idx} {runLoop}");
  168. if (runLoop)
  169. RunSchedulerIteration ();
  170. continue;
  171. }
  172. for (int i = 0; i < idleArray.Length; i++) {
  173. if (ret == taskList[i + 1 + operationArray.Length]) {
  174. idx = i;
  175. break;
  176. }
  177. }
  178. if (idx >= 0) {
  179. var item = idleArray[idx];
  180. Debug ($"MAIN LOOP #3: {idx} group={item.Item1.ID} Cnc={item.Item2.ID}");
  181. idleConnections.Remove (item);
  182. CloseIdleConnection (item.Item1, item.Item2);
  183. }
  184. }
  185. }
  186. }
  187. void Cleanup ()
  188. {
  189. if (groups != null) {
  190. var keys = new string[groups.Count];
  191. groups.Keys.CopyTo (keys, 0);
  192. foreach (var groupName in keys) {
  193. if (!groups.ContainsKey (groupName))
  194. continue;
  195. var group = groups[groupName];
  196. if (group.IsEmpty ()) {
  197. Debug ($"CLEANUP - REMOVING group={group.ID}");
  198. groups.Remove (groupName);
  199. }
  200. }
  201. if (groups.Count == 0)
  202. groups = null;
  203. }
  204. }
  205. void RunSchedulerIteration ()
  206. {
  207. schedulerEvent.Reset ();
  208. bool repeat;
  209. do {
  210. Debug ($"ITERATION");
  211. repeat = SchedulerIteration (defaultGroup);
  212. Debug ($"ITERATION #1: {repeat} {groups != null}");
  213. if (groups != null) {
  214. foreach (var group in groups) {
  215. Debug ($"ITERATION #2: group={group.Value.ID}");
  216. repeat |= SchedulerIteration (group.Value);
  217. }
  218. }
  219. Debug ($"ITERATION #3: {repeat}");
  220. } while (repeat);
  221. }
  222. bool OperationCompleted (ConnectionGroup group, WebOperation operation, Task<(bool, WebOperation)> task)
  223. {
  224. #if MONO_WEB_DEBUG
  225. var me = $"{nameof (OperationCompleted)}(group={group.ID}, Op={operation.ID}, Cnc={operation.Connection.ID})";
  226. #else
  227. string me = null;
  228. #endif
  229. var (ok, next) = task.Status == TaskStatus.RanToCompletion ? task.Result : (false, null);
  230. Debug ($"{me}: {task.Status} {ok} {next?.ID}");
  231. if (!ok || !operation.Connection.Continue (next)) {
  232. group.RemoveConnection (operation.Connection);
  233. if (next == null) {
  234. Debug ($"{me}: closed connection and done.");
  235. return true;
  236. }
  237. ok = false;
  238. }
  239. if (next == null) {
  240. if (ok) {
  241. var idleTask = Task.Delay (MaxIdleTime);
  242. idleConnections.AddLast ((group, operation.Connection, idleTask));
  243. Debug ($"{me} keeping connection open for {MaxIdleTime} milliseconds.");
  244. } else {
  245. Debug ($"{me}: closed connection and done.");
  246. }
  247. return true;
  248. }
  249. Debug ($"{me} got new operation next={next.ID}.");
  250. operations.AddLast ((group, next));
  251. if (ok) {
  252. Debug ($"{me} continuing next={next.ID} on same connection.");
  253. RemoveIdleConnection (operation.Connection);
  254. return false;
  255. }
  256. group.Cleanup ();
  257. var (connection, created) = group.CreateOrReuseConnection (next, true);
  258. Debug ($"{me} created new connection Cnc={connection.ID} next={next.ID}.");
  259. return false;
  260. }
  261. void CloseIdleConnection (ConnectionGroup group, WebConnection connection)
  262. {
  263. Debug ($"{nameof (CloseIdleConnection)}(group={group.ID}, Cnc={connection.ID}) closing idle connection.");
  264. group.RemoveConnection (connection);
  265. RemoveIdleConnection (connection);
  266. }
  267. bool SchedulerIteration (ConnectionGroup group)
  268. {
  269. #if MONO_WEB_DEBUG
  270. var me = $"{nameof (SchedulerIteration)}(group={group.ID})";
  271. #else
  272. string me = null;
  273. #endif
  274. Debug ($"{me}");
  275. // First, let's clean up.
  276. group.Cleanup ();
  277. // Is there anything in the queue?
  278. var next = group.GetNextOperation ();
  279. Debug ($"{me} no pending operations.");
  280. if (next == null)
  281. return false;
  282. Debug ($"{me} found pending operation Op={next.ID}");
  283. var (connection, created) = group.CreateOrReuseConnection (next, false);
  284. if (connection == null) {
  285. // All connections are currently busy, need to keep it in the queue for now.
  286. Debug ($"{me} all connections busy, keeping operation in queue.");
  287. return false;
  288. }
  289. Debug ($"{me} started operation: Op={next.ID} Cnc={connection.ID}");
  290. operations.AddLast ((group, next));
  291. RemoveIdleConnection (connection);
  292. return true;
  293. }
  294. void RemoveOperation (WebOperation operation)
  295. {
  296. var iter = operations.First;
  297. while (iter != null) {
  298. var node = iter;
  299. iter = iter.Next;
  300. if (node.Value.Item2 == operation)
  301. operations.Remove (node);
  302. }
  303. }
  304. void RemoveIdleConnection (WebConnection connection)
  305. {
  306. var iter = idleConnections.First;
  307. while (iter != null) {
  308. var node = iter;
  309. iter = iter.Next;
  310. if (node.Value.Item2 == connection)
  311. idleConnections.Remove (node);
  312. }
  313. }
  314. public void SendRequest (WebOperation operation, string groupName)
  315. {
  316. lock (ServicePoint) {
  317. var group = GetConnectionGroup (groupName);
  318. Debug ($"SEND REQUEST: Op={operation.ID} group={group.ID}");
  319. group.EnqueueOperation (operation);
  320. Run ();
  321. Debug ($"SEND REQUEST DONE: Op={operation.ID} group={group.ID}");
  322. }
  323. }
  324. public bool CloseConnectionGroup (string groupName)
  325. {
  326. lock (ServicePoint) {
  327. ConnectionGroup group;
  328. if (string.IsNullOrEmpty (groupName))
  329. group = defaultGroup;
  330. else if (groups == null || !groups.TryGetValue (groupName, out group))
  331. return false;
  332. Debug ($"CLOSE CONNECTION GROUP: group={group.ID}");
  333. if (group != defaultGroup) {
  334. groups.Remove (groupName);
  335. if (groups.Count == 0)
  336. groups = null;
  337. }
  338. group.Close ();
  339. Run ();
  340. return true;
  341. }
  342. }
  343. ConnectionGroup GetConnectionGroup (string name)
  344. {
  345. lock (ServicePoint) {
  346. if (string.IsNullOrEmpty (name))
  347. return defaultGroup;
  348. if (groups == null)
  349. groups = new Dictionary<string, ConnectionGroup> ();
  350. if (groups.TryGetValue (name, out ConnectionGroup group))
  351. return group;
  352. group = new ConnectionGroup (this, name);
  353. groups.Add (name, group);
  354. return group;
  355. }
  356. }
  357. void OnConnectionCreated (WebConnection connection)
  358. {
  359. Interlocked.Increment (ref currentConnections);
  360. }
  361. void OnConnectionClosed (WebConnection connection)
  362. {
  363. RemoveIdleConnection (connection);
  364. Interlocked.Decrement (ref currentConnections);
  365. }
  366. class ConnectionGroup
  367. {
  368. public ServicePointScheduler Scheduler {
  369. get;
  370. }
  371. public string Name {
  372. get;
  373. }
  374. public bool IsDefault => string.IsNullOrEmpty (Name);
  375. static int nextId;
  376. public readonly int ID = ++nextId;
  377. LinkedList<WebConnection> connections;
  378. LinkedList<WebOperation> queue;
  379. public ConnectionGroup (ServicePointScheduler scheduler, string name)
  380. {
  381. Scheduler = scheduler;
  382. Name = name;
  383. connections = new LinkedList<WebConnection> ();
  384. queue = new LinkedList<WebOperation> ();
  385. }
  386. public bool IsEmpty ()
  387. {
  388. return connections.Count == 0 && queue.Count == 0;
  389. }
  390. public void RemoveConnection (WebConnection connection)
  391. {
  392. Scheduler.Debug ($"REMOVING CONNECTION: group={ID} cnc={connection.ID}");
  393. connections.Remove (connection);
  394. connection.Dispose ();
  395. Scheduler.OnConnectionClosed (connection);
  396. }
  397. public void Cleanup ()
  398. {
  399. var iter = connections.First;
  400. while (iter != null) {
  401. var connection = iter.Value;
  402. var node = iter;
  403. iter = iter.Next;
  404. if (connection.Closed) {
  405. Scheduler.Debug ($"REMOVING CONNECTION: group={ID} cnc={connection.ID}");
  406. connections.Remove (node);
  407. Scheduler.OnConnectionClosed (connection);
  408. }
  409. }
  410. }
  411. public void Close ()
  412. {
  413. foreach (var operation in queue) {
  414. operation.Abort ();
  415. Scheduler.RemoveOperation (operation);
  416. }
  417. queue.Clear ();
  418. foreach (var connection in connections) {
  419. connection.Dispose ();
  420. Scheduler.OnConnectionClosed (connection);
  421. }
  422. connections.Clear ();
  423. }
  424. public void EnqueueOperation (WebOperation operation)
  425. {
  426. queue.AddLast (operation);
  427. }
  428. public WebOperation GetNextOperation ()
  429. {
  430. // Is there anything in the queue?
  431. var iter = queue.First;
  432. while (iter != null) {
  433. var operation = iter.Value;
  434. var node = iter;
  435. iter = iter.Next;
  436. if (operation.Aborted) {
  437. queue.Remove (node);
  438. Scheduler.RemoveOperation (operation);
  439. continue;
  440. }
  441. return operation;
  442. }
  443. return null;
  444. }
  445. public WebConnection FindIdleConnection (WebOperation operation)
  446. {
  447. // First let's find the ideal candidate.
  448. WebConnection candidate = null;
  449. foreach (var connection in connections) {
  450. if (connection.CanReuseConnection (operation)) {
  451. if (candidate == null || connection.IdleSince > candidate.IdleSince)
  452. candidate = connection;
  453. }
  454. }
  455. // Found one? Make sure it's actually willing to run it.
  456. if (candidate != null && candidate.StartOperation (operation, true)) {
  457. queue.Remove (operation);
  458. return candidate;
  459. }
  460. // Ok, let's loop again and pick the first one that accepts the new operation.
  461. foreach (var connection in connections) {
  462. if (connection.StartOperation (operation, true)) {
  463. queue.Remove (operation);
  464. return connection;
  465. }
  466. }
  467. return null;
  468. }
  469. public (WebConnection connection, bool created) CreateOrReuseConnection (WebOperation operation, bool force)
  470. {
  471. var connection = FindIdleConnection (operation);
  472. if (connection != null)
  473. return (connection, false);
  474. if (force || Scheduler.ServicePoint.ConnectionLimit > connections.Count || connections.Count == 0) {
  475. connection = new WebConnection (Scheduler.ServicePoint);
  476. connection.StartOperation (operation, false);
  477. connections.AddFirst (connection);
  478. Scheduler.OnConnectionCreated (connection);
  479. queue.Remove (operation);
  480. return (connection, true);
  481. }
  482. return (null, false);
  483. }
  484. }
  485. // https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
  486. class AsyncManualResetEvent
  487. {
  488. volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool> ();
  489. public Task WaitAsync () { return m_tcs.Task; }
  490. public bool WaitOne (int millisecondTimeout)
  491. {
  492. WebConnection.Debug ($"AMRE WAIT ONE: {millisecondTimeout}");
  493. return m_tcs.Task.Wait (millisecondTimeout);
  494. }
  495. public async Task<bool> WaitAsync (int millisecondTimeout)
  496. {
  497. var timeoutTask = Task.Delay (millisecondTimeout);
  498. var ret = await Task.WhenAny (m_tcs.Task, timeoutTask).ConfigureAwait (false);
  499. return ret != timeoutTask;
  500. }
  501. public void Set ()
  502. {
  503. var tcs = m_tcs;
  504. Task.Factory.StartNew (s => ((TaskCompletionSource<bool>)s).TrySetResult (true),
  505. tcs, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
  506. tcs.Task.Wait ();
  507. }
  508. public void Reset ()
  509. {
  510. while (true) {
  511. var tcs = m_tcs;
  512. if (!tcs.Task.IsCompleted ||
  513. Interlocked.CompareExchange (ref m_tcs, new TaskCompletionSource<bool> (), tcs) == tcs)
  514. return;
  515. }
  516. }
  517. public AsyncManualResetEvent (bool state)
  518. {
  519. if (state)
  520. Set ();
  521. }
  522. }
  523. }
  524. }