| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621 |
- //
- // ServicePointScheduler.cs
- //
- // Author:
- // Martin Baulig <[email protected]>
- //
- // Copyright (c) 2017 Xamarin Inc. (http://www.xamarin.com)
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- using System.IO;
- using System.Collections;
- using System.Collections.Generic;
- using System.Net.Sockets;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Runtime.ExceptionServices;
- using System.Diagnostics;
- namespace System.Net
- {
- class ServicePointScheduler
- {
- public ServicePoint ServicePoint {
- get;
- }
- public int MaxIdleTime {
- get { return maxIdleTime; }
- set {
- if (value < Timeout.Infinite || value > Int32.MaxValue)
- throw new ArgumentOutOfRangeException ();
- if (value == maxIdleTime)
- return;
- maxIdleTime = value;
- Debug ($"MAX IDLE TIME = {value}");
- Run ();
- }
- }
- public int ConnectionLimit {
- get { return connectionLimit; }
- set {
- if (value <= 0)
- throw new ArgumentOutOfRangeException ();
- if (value == connectionLimit)
- return;
- connectionLimit = value;
- Debug ($"CONNECTION LIMIT = {value}");
- Run ();
- }
- }
- public ServicePointScheduler (ServicePoint servicePoint, int connectionLimit, int maxIdleTime)
- {
- ServicePoint = servicePoint;
- this.connectionLimit = connectionLimit;
- this.maxIdleTime = maxIdleTime;
- schedulerEvent = new AsyncManualResetEvent (false);
- defaultGroup = new ConnectionGroup (this, string.Empty);
- operations = new LinkedList<(ConnectionGroup, WebOperation)> ();
- idleConnections = new LinkedList<(ConnectionGroup, WebConnection, Task)> ();
- idleSince = DateTime.UtcNow;
- }
- [Conditional ("MONO_WEB_DEBUG")]
- void Debug (string message, params object[] args)
- {
- WebConnection.Debug ($"SPS({ID}): {string.Format (message, args)}");
- }
- [Conditional ("MONO_WEB_DEBUG")]
- void Debug (string message)
- {
- WebConnection.Debug ($"SPS({ID}): {message}");
- }
- int running;
- int maxIdleTime = 100000;
- AsyncManualResetEvent schedulerEvent;
- ConnectionGroup defaultGroup;
- Dictionary<string, ConnectionGroup> groups;
- LinkedList<(ConnectionGroup, WebOperation)> operations;
- LinkedList<(ConnectionGroup, WebConnection, Task)> idleConnections;
- int currentConnections;
- int connectionLimit;
- DateTime idleSince;
- public int CurrentConnections {
- get {
- return currentConnections;
- }
- }
- public DateTime IdleSince {
- get {
- return idleSince;
- }
- }
- static int nextId;
- public readonly int ID = ++nextId;
- internal string ME {
- get;
- }
- public void Run ()
- {
- lock (ServicePoint) {
- if (Interlocked.CompareExchange (ref running, 1, 0) == 0)
- StartScheduler ();
- schedulerEvent.Set ();
- }
- }
- async void StartScheduler ()
- {
- idleSince = DateTime.UtcNow + TimeSpan.FromDays (3650);
- while (true) {
- Debug ($"MAIN LOOP");
- // Gather list of currently running operations.
- ValueTuple<ConnectionGroup, WebOperation>[] operationArray;
- ValueTuple<ConnectionGroup, WebConnection, Task>[] idleArray;
- var taskList = new List<Task> ();
- lock (ServicePoint) {
- Cleanup ();
- if (groups == null && defaultGroup.IsEmpty () && operations.Count == 0 && idleConnections.Count == 0) {
- Debug ($"MAIN LOOP DONE");
- running = 0;
- idleSince = DateTime.UtcNow;
- schedulerEvent.Reset ();
- return;
- }
- operationArray = new ValueTuple<ConnectionGroup, WebOperation>[operations.Count];
- operations.CopyTo (operationArray, 0);
- idleArray = new ValueTuple<ConnectionGroup, WebConnection, Task>[idleConnections.Count];
- idleConnections.CopyTo (idleArray, 0);
- taskList.Add (schedulerEvent.WaitAsync (maxIdleTime));
- foreach (var item in operationArray)
- taskList.Add (item.Item2.WaitForCompletion (true));
- foreach (var item in idleArray)
- taskList.Add (item.Item3);
- }
- Debug ($"MAIN LOOP #1: operations={operationArray.Length} idle={idleArray.Length}");
- var ret = await Task.WhenAny (taskList).ConfigureAwait (false);
- lock (ServicePoint) {
- if (ret == taskList[0]) {
- RunSchedulerIteration ();
- continue;
- }
- int idx = -1;
- for (int i = 0; i < operationArray.Length; i++) {
- if (ret == taskList[i + 1]) {
- idx = i;
- break;
- }
- }
- if (idx >= 0) {
- var item = operationArray[idx];
- Debug ($"MAIN LOOP #2: {idx} group={item.Item1.ID} Op={item.Item2.ID}");
- operations.Remove (item);
- var opTask = (Task<ValueTuple<bool, WebOperation>>)ret;
- var runLoop = OperationCompleted (item.Item1, item.Item2, opTask);
- Debug ($"MAIN LOOP #2 DONE: {idx} {runLoop}");
- if (runLoop)
- RunSchedulerIteration ();
- continue;
- }
- for (int i = 0; i < idleArray.Length; i++) {
- if (ret == taskList[i + 1 + operationArray.Length]) {
- idx = i;
- break;
- }
- }
- if (idx >= 0) {
- var item = idleArray[idx];
- Debug ($"MAIN LOOP #3: {idx} group={item.Item1.ID} Cnc={item.Item2.ID}");
- idleConnections.Remove (item);
- CloseIdleConnection (item.Item1, item.Item2);
- }
- }
- }
- }
- void Cleanup ()
- {
- if (groups != null) {
- var keys = new string[groups.Count];
- groups.Keys.CopyTo (keys, 0);
- foreach (var groupName in keys) {
- if (!groups.ContainsKey (groupName))
- continue;
- var group = groups[groupName];
- if (group.IsEmpty ()) {
- Debug ($"CLEANUP - REMOVING group={group.ID}");
- groups.Remove (groupName);
- }
- }
- if (groups.Count == 0)
- groups = null;
- }
- }
- void RunSchedulerIteration ()
- {
- schedulerEvent.Reset ();
- bool repeat;
- do {
- Debug ($"ITERATION");
- repeat = SchedulerIteration (defaultGroup);
- Debug ($"ITERATION #1: {repeat} {groups != null}");
- if (groups != null) {
- foreach (var group in groups) {
- Debug ($"ITERATION #2: group={group.Value.ID}");
- repeat |= SchedulerIteration (group.Value);
- }
- }
- Debug ($"ITERATION #3: {repeat}");
- } while (repeat);
- }
- bool OperationCompleted (ConnectionGroup group, WebOperation operation, Task<(bool, WebOperation)> task)
- {
- #if MONO_WEB_DEBUG
- var me = $"{nameof (OperationCompleted)}(group={group.ID}, Op={operation.ID}, Cnc={operation.Connection.ID})";
- #else
- string me = null;
- #endif
- var (ok, next) = task.Status == TaskStatus.RanToCompletion ? task.Result : (false, null);
- Debug ($"{me}: {task.Status} {ok} {next?.ID}");
- if (!ok || !operation.Connection.Continue (next)) {
- group.RemoveConnection (operation.Connection);
- if (next == null) {
- Debug ($"{me}: closed connection and done.");
- return true;
- }
- ok = false;
- }
- if (next == null) {
- if (ok) {
- var idleTask = Task.Delay (MaxIdleTime);
- idleConnections.AddLast ((group, operation.Connection, idleTask));
- Debug ($"{me} keeping connection open for {MaxIdleTime} milliseconds.");
- } else {
- Debug ($"{me}: closed connection and done.");
- }
- return true;
- }
- Debug ($"{me} got new operation next={next.ID}.");
- operations.AddLast ((group, next));
- if (ok) {
- Debug ($"{me} continuing next={next.ID} on same connection.");
- RemoveIdleConnection (operation.Connection);
- return false;
- }
- group.Cleanup ();
- var (connection, created) = group.CreateOrReuseConnection (next, true);
- Debug ($"{me} created new connection Cnc={connection.ID} next={next.ID}.");
- return false;
- }
- void CloseIdleConnection (ConnectionGroup group, WebConnection connection)
- {
- Debug ($"{nameof (CloseIdleConnection)}(group={group.ID}, Cnc={connection.ID}) closing idle connection.");
- group.RemoveConnection (connection);
- RemoveIdleConnection (connection);
- }
- bool SchedulerIteration (ConnectionGroup group)
- {
- #if MONO_WEB_DEBUG
- var me = $"{nameof (SchedulerIteration)}(group={group.ID})";
- #else
- string me = null;
- #endif
- Debug ($"{me}");
- // First, let's clean up.
- group.Cleanup ();
- // Is there anything in the queue?
- var next = group.GetNextOperation ();
- Debug ($"{me} no pending operations.");
- if (next == null)
- return false;
- Debug ($"{me} found pending operation Op={next.ID}");
- var (connection, created) = group.CreateOrReuseConnection (next, false);
- if (connection == null) {
- // All connections are currently busy, need to keep it in the queue for now.
- Debug ($"{me} all connections busy, keeping operation in queue.");
- return false;
- }
- Debug ($"{me} started operation: Op={next.ID} Cnc={connection.ID}");
- operations.AddLast ((group, next));
- RemoveIdleConnection (connection);
- return true;
- }
- void RemoveOperation (WebOperation operation)
- {
- var iter = operations.First;
- while (iter != null) {
- var node = iter;
- iter = iter.Next;
- if (node.Value.Item2 == operation)
- operations.Remove (node);
- }
- }
- void RemoveIdleConnection (WebConnection connection)
- {
- var iter = idleConnections.First;
- while (iter != null) {
- var node = iter;
- iter = iter.Next;
- if (node.Value.Item2 == connection)
- idleConnections.Remove (node);
- }
- }
- public void SendRequest (WebOperation operation, string groupName)
- {
- lock (ServicePoint) {
- var group = GetConnectionGroup (groupName);
- Debug ($"SEND REQUEST: Op={operation.ID} group={group.ID}");
- group.EnqueueOperation (operation);
- Run ();
- Debug ($"SEND REQUEST DONE: Op={operation.ID} group={group.ID}");
- }
- }
- public bool CloseConnectionGroup (string groupName)
- {
- lock (ServicePoint) {
- ConnectionGroup group;
- if (string.IsNullOrEmpty (groupName))
- group = defaultGroup;
- else if (groups == null || !groups.TryGetValue (groupName, out group))
- return false;
- Debug ($"CLOSE CONNECTION GROUP: group={group.ID}");
- if (group != defaultGroup) {
- groups.Remove (groupName);
- if (groups.Count == 0)
- groups = null;
- }
- group.Close ();
- Run ();
- return true;
- }
- }
- ConnectionGroup GetConnectionGroup (string name)
- {
- lock (ServicePoint) {
- if (string.IsNullOrEmpty (name))
- return defaultGroup;
- if (groups == null)
- groups = new Dictionary<string, ConnectionGroup> ();
- if (groups.TryGetValue (name, out ConnectionGroup group))
- return group;
- group = new ConnectionGroup (this, name);
- groups.Add (name, group);
- return group;
- }
- }
- void OnConnectionCreated (WebConnection connection)
- {
- Interlocked.Increment (ref currentConnections);
- }
- void OnConnectionClosed (WebConnection connection)
- {
- RemoveIdleConnection (connection);
- Interlocked.Decrement (ref currentConnections);
- }
- class ConnectionGroup
- {
- public ServicePointScheduler Scheduler {
- get;
- }
- public string Name {
- get;
- }
- public bool IsDefault => string.IsNullOrEmpty (Name);
- static int nextId;
- public readonly int ID = ++nextId;
- LinkedList<WebConnection> connections;
- LinkedList<WebOperation> queue;
- public ConnectionGroup (ServicePointScheduler scheduler, string name)
- {
- Scheduler = scheduler;
- Name = name;
- connections = new LinkedList<WebConnection> ();
- queue = new LinkedList<WebOperation> ();
- }
- public bool IsEmpty ()
- {
- return connections.Count == 0 && queue.Count == 0;
- }
- public void RemoveConnection (WebConnection connection)
- {
- Scheduler.Debug ($"REMOVING CONNECTION: group={ID} cnc={connection.ID}");
- connections.Remove (connection);
- connection.Dispose ();
- Scheduler.OnConnectionClosed (connection);
- }
- public void Cleanup ()
- {
- var iter = connections.First;
- while (iter != null) {
- var connection = iter.Value;
- var node = iter;
- iter = iter.Next;
- if (connection.Closed) {
- Scheduler.Debug ($"REMOVING CONNECTION: group={ID} cnc={connection.ID}");
- connections.Remove (node);
- Scheduler.OnConnectionClosed (connection);
- }
- }
- }
- public void Close ()
- {
- foreach (var operation in queue) {
- operation.Abort ();
- Scheduler.RemoveOperation (operation);
- }
- queue.Clear ();
- foreach (var connection in connections) {
- connection.Dispose ();
- Scheduler.OnConnectionClosed (connection);
- }
- connections.Clear ();
- }
- public void EnqueueOperation (WebOperation operation)
- {
- queue.AddLast (operation);
- }
- public WebOperation GetNextOperation ()
- {
- // Is there anything in the queue?
- var iter = queue.First;
- while (iter != null) {
- var operation = iter.Value;
- var node = iter;
- iter = iter.Next;
- if (operation.Aborted) {
- queue.Remove (node);
- Scheduler.RemoveOperation (operation);
- continue;
- }
- return operation;
- }
- return null;
- }
- public WebConnection FindIdleConnection (WebOperation operation)
- {
- // First let's find the ideal candidate.
- WebConnection candidate = null;
- foreach (var connection in connections) {
- if (connection.CanReuseConnection (operation)) {
- if (candidate == null || connection.IdleSince > candidate.IdleSince)
- candidate = connection;
- }
- }
- // Found one? Make sure it's actually willing to run it.
- if (candidate != null && candidate.StartOperation (operation, true)) {
- queue.Remove (operation);
- return candidate;
- }
- // Ok, let's loop again and pick the first one that accepts the new operation.
- foreach (var connection in connections) {
- if (connection.StartOperation (operation, true)) {
- queue.Remove (operation);
- return connection;
- }
- }
- return null;
- }
- public (WebConnection connection, bool created) CreateOrReuseConnection (WebOperation operation, bool force)
- {
- var connection = FindIdleConnection (operation);
- if (connection != null)
- return (connection, false);
- if (force || Scheduler.ServicePoint.ConnectionLimit > connections.Count || connections.Count == 0) {
- connection = new WebConnection (Scheduler.ServicePoint);
- connection.StartOperation (operation, false);
- connections.AddFirst (connection);
- Scheduler.OnConnectionCreated (connection);
- queue.Remove (operation);
- return (connection, true);
- }
- return (null, false);
- }
- }
- // https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
- class AsyncManualResetEvent
- {
- volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool> ();
- public Task WaitAsync () { return m_tcs.Task; }
- public bool WaitOne (int millisecondTimeout)
- {
- WebConnection.Debug ($"AMRE WAIT ONE: {millisecondTimeout}");
- return m_tcs.Task.Wait (millisecondTimeout);
- }
- public async Task<bool> WaitAsync (int millisecondTimeout)
- {
- var timeoutTask = Task.Delay (millisecondTimeout);
- var ret = await Task.WhenAny (m_tcs.Task, timeoutTask).ConfigureAwait (false);
- return ret != timeoutTask;
- }
- public void Set ()
- {
- var tcs = m_tcs;
- Task.Factory.StartNew (s => ((TaskCompletionSource<bool>)s).TrySetResult (true),
- tcs, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
- tcs.Task.Wait ();
- }
- public void Reset ()
- {
- while (true) {
- var tcs = m_tcs;
- if (!tcs.Task.IsCompleted ||
- Interlocked.CompareExchange (ref m_tcs, new TaskCompletionSource<bool> (), tcs) == tcs)
- return;
- }
- }
- public AsyncManualResetEvent (bool state)
- {
- if (state)
- Set ();
- }
- }
- }
- }
|