Browse Source

[c#/beetlex] Optimized actions (#6228)

* update beetlex 1.4.3

update beetlex 1.4.3

* docker add COMPlus_ReadyToRun variable
update beetlex

* update beetlex, enabled thread queue

* beetlex framework add db and queries cases

* add db code

* change result json data

* update query url

* beetlex framework add fortunes cases

* change Content-Type

* add beetlex core cases

* fix queries cases

* update config

* change try readline

* update benchmark config

* Update README.md

* Update README.md

* change versus property

* beetlex-core update .net core to v3.0

* change beetlex-core project file

* beetlex update raw db class

* beetlex update raw db

* beetlex debug plaintext

* change debug docker file

* update beetlex to 1.4.0

* update

* beetlex update core 3.1

* [c#/beetlex] add updates cases

* [c#/beetlex] change Server: TFB, change custom connection pool, add update docker

* fix errors

* change pool init

* change connection pool maxsize

* fix fortunes errors

* clear DBRaw _connectionString value.

* [c#beetlex] change update dbconnection pool size

* [c#/beetlex] udpate spanjson to v3.0.1, Npgsql v5.0.0

* [c#/beetlex] add caching sample

* set connectionstring multiplexing

* remove connection multiplexing setting

* [c#/beetlex]change NpgsqlParameter to  NpgsqlParameter<T>

* [c#/beetlex] update dbraw

* [c#/beetlex] change connection string

* [c#/beetlex]  add fortunes cases to core-updb

* update beetlex 1.5.6

* update 5.0.0-alpha1

* update docker file

* Enabled IOQueues

* Set IOQueues debug mode

* update

* [c#/beetlex] udpate to v1.6.0.1-beta

* update pg drive

* [c#/beetlex] update to beetlex v1.6.3 and support pipelining

* set options

* [c#/beetlex] Optimized actions
Henry 4 years ago
parent
commit
9cfd171923

+ 97 - 64
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpHandler.cs

@@ -51,13 +51,15 @@ namespace PlatformBenchmarks
 
 
         public HttpHandler()
         public HttpHandler()
         {
         {
-
+            RequestDispatchs = new BeetleX.Dispatchs.DispatchCenter<HttpToken>(OnRequest, Math.Min(Environment.ProcessorCount, 16));
         }
         }
 
 
-        public Task Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
+        private BeetleX.Dispatchs.DispatchCenter<HttpToken> RequestDispatchs;
+
+        public Task Default(PipeStream stream, HttpToken token, ISession session)
         {
         {
             stream.Write("<b> beetlex server</b><hr/>");
             stream.Write("<b> beetlex server</b><hr/>");
-            stream.Write($"{Encoding.ASCII.GetString(url)} not found!");
+            stream.Write("path not found!");
             OnCompleted(stream, session, token);
             OnCompleted(stream, session, token);
             return Task.CompletedTask;
             return Task.CompletedTask;
         }
         }
@@ -67,6 +69,8 @@ namespace PlatformBenchmarks
             base.Connected(server, e);
             base.Connected(server, e);
             e.Session.Socket.NoDelay = true;
             e.Session.Socket.NoDelay = true;
             var token = new HttpToken();
             var token = new HttpToken();
+            token.ThreadDispatcher = RequestDispatchs.Next();
+            token.Session = e.Session;
             token.Db = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
             token.Db = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
             e.Session.Tag = token;
             e.Session.Tag = token;
         }
         }
@@ -82,6 +86,14 @@ namespace PlatformBenchmarks
 
 
         }
         }
 
 
+        private void OnRequest(HttpToken token)
+        {
+            if (token.Requests.TryDequeue(out RequestData result))
+            {
+                OnStartRequest(result, token.Session, token, token.Session.Stream.ToPipeStream());
+            }
+        }
+
         public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
         public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
         {
         {
             base.SessionReceive(server, e);
             base.SessionReceive(server, e);
@@ -96,6 +108,7 @@ namespace PlatformBenchmarks
                     {
                     {
                         token.Requests.Enqueue(token.CurrentRequest);
                         token.Requests.Enqueue(token.CurrentRequest);
                         token.CurrentRequest = null;
                         token.CurrentRequest = null;
+                        token.ThreadDispatcher.Enqueue(token);
                     }
                     }
                     pipeStream.ReadFree(result.Length);
                     pipeStream.ReadFree(result.Length);
                 }
                 }
@@ -103,10 +116,26 @@ namespace PlatformBenchmarks
                 {
                 {
                     if (token.CurrentRequest == null)
                     if (token.CurrentRequest == null)
                     {
                     {
-                        token.CurrentRequest = new RequestData();
-                        var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(result.Length);
+                        var request = new RequestData();
+
+                        byte[] buffer = null;
+                        if (Program.Debug)
+                            buffer = new byte[result.Length];
+                        else
+                            buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(result.Length);
                         pipeStream.Read(buffer, 0, result.Length);
                         pipeStream.Read(buffer, 0, result.Length);
-                        token.CurrentRequest.Data = new ArraySegment<byte>(buffer, 0, result.Length);
+                        request.Data = new ArraySegment<byte>(buffer, 0, result.Length);
+                        AnalysisAction(request);
+                        if (request.Action == ActionType.Plaintext)
+                        {
+                            token.CurrentRequest = request;
+                        }
+                        else
+                        {
+                            pipeStream.ReadFree((int)pipeStream.Length);
+                            OnStartRequest(request, e.Session, token, pipeStream);
+                            return;
+                        }
                     }
                     }
                     else
                     else
                     {
                     {
@@ -118,41 +147,9 @@ namespace PlatformBenchmarks
                 else
                 else
                     break;
                     break;
             }
             }
-            if (pipeStream.Length == 0 && token.CurrentRequest == null)
-            {
-                ProcessReqeusts(token, pipeStream, e.Session);
-            }
-        }
-
-        private async Task ProcessReqeusts(HttpToken token, PipeStream pipeStream, ISession session)
-        {
-        PROCESS:
-            if (token.EnterProcess())
-            {
-                while (true)
-                {
-                    if (token.Requests.TryDequeue(out RequestData item))
-                    {
-                        using (item)
-                        {
-                            await OnProcess(item, pipeStream, token, session);
-                        }
-                    }
-                    else
-                    {
-                        break;
-                    }
-                }
-                session.Stream.Flush();
-                token.CompletedProcess();
-                if (!token.Requests.IsEmpty)
-                {
-                    goto PROCESS;
-                }
-            }
         }
         }
 
 
-        private Task OnProcess(RequestData requestData, PipeStream pipeStream, HttpToken token, ISession sessino)
+        private void AnalysisAction(RequestData requestData)
         {
         {
             var line = _line.AsSpan();
             var line = _line.AsSpan();
             int len = requestData.Data.Count;
             int len = requestData.Data.Count;
@@ -187,14 +184,6 @@ namespace PlatformBenchmarks
                     }
                     }
                 }
                 }
             }
             }
-            return OnStartLine(http, method, url, sessino, token, pipeStream);
-
-        }
-
-
-        public virtual Task OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
-        {
-
             int queryIndex = AnalysisUrl(url);
             int queryIndex = AnalysisUrl(url);
             ReadOnlySpan<byte> baseUrl = default;
             ReadOnlySpan<byte> baseUrl = default;
             ReadOnlySpan<byte> queryString = default;
             ReadOnlySpan<byte> queryString = default;
@@ -202,62 +191,105 @@ namespace PlatformBenchmarks
             {
             {
                 baseUrl = url.Slice(0, queryIndex);
                 baseUrl = url.Slice(0, queryIndex);
                 queryString = url.Slice(queryIndex + 1, url.Length - queryIndex - 1);
                 queryString = url.Slice(queryIndex + 1, url.Length - queryIndex - 1);
+                requestData.QueryString = Encoding.ASCII.GetString(queryString);
             }
             }
             else
             else
             {
             {
                 baseUrl = url;
                 baseUrl = url;
             }
             }
-            OnWriteHeader(stream, token);
             if (baseUrl.Length == _path_Plaintext.Length && baseUrl.StartsWith(_path_Plaintext))
             if (baseUrl.Length == _path_Plaintext.Length && baseUrl.StartsWith(_path_Plaintext))
+            {
+                requestData.Action = ActionType.Plaintext;
+            }
+            else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
+            {
+                requestData.Action = ActionType.Json;
+            }
+            else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
+            {
+                requestData.Action = ActionType.Db;
+            }
+            else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
+            {
+                requestData.Action = ActionType.Queries;
+            }
+
+            else if (baseUrl.Length == _cached_worlds.Length && baseUrl.StartsWith(_cached_worlds))
+            {
+                requestData.Action = ActionType.Caching;
+            }
+
+            else if (baseUrl.Length == _path_Updates.Length && baseUrl.StartsWith(_path_Updates))
+            {
+                requestData.Action = ActionType.Updates;
+            }
+            else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
+            {
+                requestData.Action = ActionType.Fortunes;
+            }
+            else
+            {
+                requestData.Action = ActionType.Other;
+            }
+        }
+
+        public virtual async Task OnStartRequest(RequestData data, ISession session, HttpToken token, PipeStream stream)
+        {
+            OnWriteHeader(stream, token);
+            ActionType type = data.Action;
+            if (type == ActionType.Plaintext)
             {
             {
                 stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
                 stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return Plaintext(url, stream, token, session);
+                await Plaintext(stream, token, session);
             }
             }
-            else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
+            else if (type == ActionType.Json)
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return Json(stream, token, session);
+                await Json(stream, token, session);
             }
             }
-            else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
+            else if (type == ActionType.Db)
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return db(stream, token, session);
+                await db(stream, token, session);
             }
             }
-            else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
+            else if (type == ActionType.Queries)
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return queries(Encoding.ASCII.GetString(queryString), stream, token, session);
+                await queries(data.QueryString, stream, token, session);
             }
             }
 
 
-            else if (baseUrl.Length == _cached_worlds.Length && baseUrl.StartsWith(_cached_worlds))
+            else if (type == ActionType.Caching)
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return caching(Encoding.ASCII.GetString(queryString), stream, token, session);
+                await caching(data.QueryString, stream, token, session);
             }
             }
 
 
-            else if (baseUrl.Length == _path_Updates.Length && baseUrl.StartsWith(_path_Updates))
+            else if (type == ActionType.Updates)
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return updates(Encoding.ASCII.GetString(queryString), stream, token, session);
+                await updates(data.QueryString, stream, token, session);
             }
             }
-            else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
+            else if (type == ActionType.Fortunes)
             {
             {
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return fortunes(stream, token, session);
+                await fortunes(stream, token, session);
             }
             }
             else
             else
             {
             {
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
-                return Default(url, stream, token, session);
+                await Default(stream, token, session);
             }
             }
+            if (!Program.Debug)
+                data.Dispose();
+
         }
         }
 
 
         private void OnWriteHeader(PipeStream stream, HttpToken token)
         private void OnWriteHeader(PipeStream stream, HttpToken token)
@@ -278,9 +310,10 @@ namespace PlatformBenchmarks
 
 
         private void OnCompleted(PipeStream stream, ISession session, HttpToken token)
         private void OnCompleted(PipeStream stream, ISession session, HttpToken token)
         {
         {
-            stream.ReadFree((int)stream.Length);
-            token.FullLength((stream.CacheLength - token.ContentPostion).ToString());
 
 
+            token.FullLength((stream.CacheLength - token.ContentPostion).ToString());
+            if (token.Requests.IsEmpty)
+                session.Stream.Flush();
         }
         }
 
 
     }
     }

+ 2 - 7
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpServer.cs

@@ -23,12 +23,6 @@ namespace PlatformBenchmarks
             serverOptions.BufferSize = 1024 * 8;
             serverOptions.BufferSize = 1024 * 8;
             serverOptions.BufferPoolMaxMemory = 1000;
             serverOptions.BufferPoolMaxMemory = 1000;
             serverOptions.BufferPoolSize = 1024 * 10;
             serverOptions.BufferPoolSize = 1024 * 10;
-            if (Program.Debug)
-            {
-                serverOptions.IOQueueEnabled = true;
-                serverOptions.IOQueues = System.Math.Min(Environment.ProcessorCount, 16);
-                serverOptions.SyncAccept = false;
-            }
             ApiServer = SocketFactory.CreateTcpServer<HttpHandler>(serverOptions);
             ApiServer = SocketFactory.CreateTcpServer<HttpHandler>(serverOptions);
             ApiServer.Open();
             ApiServer.Open();
             if (!Program.UpDB)
             if (!Program.UpDB)
@@ -38,8 +32,9 @@ namespace PlatformBenchmarks
             }
             }
             else
             else
             {
             {
-                // RawDb._connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
+
                 RawDb._connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3;Multiplexing=true;Write Coalescing Delay Us=500;Write Coalescing Buffer Threshold Bytes=1000";
                 RawDb._connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3;Multiplexing=true;Write Coalescing Delay Us=500;Write Coalescing Buffer Threshold Bytes=1000";
+                //RawDb._connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             }
             }
             ApiServer.Log(LogType.Info, null, $"Debug mode [{Program.Debug}]");
             ApiServer.Log(LogType.Info, null, $"Debug mode [{Program.Debug}]");
             return Task.CompletedTask;
             return Task.CompletedTask;

+ 7 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpToken.cs

@@ -1,4 +1,6 @@
-using BeetleX.Buffers;
+using BeetleX;
+using BeetleX.Buffers;
+using BeetleX.Dispatchs;
 using System;
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Concurrent;
 using System.Text;
 using System.Text;
@@ -16,8 +18,12 @@ namespace PlatformBenchmarks
 
 
         }
         }
 
 
+        public SingleThreadDispatcher<HttpToken> ThreadDispatcher { get; set; }
+
         public ConcurrentQueue<RequestData> Requests { get; set; } = new ConcurrentQueue<RequestData>();
         public ConcurrentQueue<RequestData> Requests { get; set; } = new ConcurrentQueue<RequestData>();
 
 
+        public ISession Session { get; set; }
+
         public RequestData CurrentRequest { get; set; }
         public RequestData CurrentRequest { get; set; }
 
 
         public byte[] GetLengthBuffer(string length)
         public byte[] GetLengthBuffer(string length)

+ 19 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/RequestData.cs

@@ -17,7 +17,26 @@ namespace PlatformBenchmarks
 
 
         public void Dispose()
         public void Dispose()
         {
         {
+
             System.Buffers.ArrayPool<byte>.Shared.Return(Data.Array);
             System.Buffers.ArrayPool<byte>.Shared.Return(Data.Array);
         }
         }
+
+        public string QueryString { get; set; }
+
+        public ActionType Action { get; set; }
+
+
+    }
+
+    public enum ActionType
+    {
+        Plaintext,
+        Json,
+        Db,
+        Queries,
+        Caching,
+        Updates,
+        Fortunes,
+        Other
     }
     }
 }
 }

+ 1 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/plaintext.cs

@@ -9,7 +9,7 @@ namespace PlatformBenchmarks
 {
 {
     public partial class HttpHandler
     public partial class HttpHandler
     {
     {
-        public Task Plaintext(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
+        public Task Plaintext(PipeStream stream, HttpToken token, ISession session)
         {
         {
             stream.Write(_result_plaintext.Data, 0, _result_plaintext.Length);
             stream.Write(_result_plaintext.Data, 0, _result_plaintext.Length);
             OnCompleted(stream, session, token);
             OnCompleted(stream, session, token);