Просмотр исходного кода

[c#/beetlex] udpate to v1.6.0.1-beta (#6189)

* update beetlex 1.4.3

update beetlex 1.4.3

* docker add COMPlus_ReadyToRun variable
update beetlex

* update beetlex, enabled thread queue

* beetlex framework add db and queries cases

* add db code

* change result json data

* update query url

* beetlex framework add fortunes cases

* change Content-Type

* add beetlex core cases

* fix queries cases

* update config

* change try readline

* update benchmark config

* Update README.md

* Update README.md

* change versus property

* beetlex-core update .net core to v3.0

* change beetlex-core project file

* beetlex update raw db class

* beetlex update raw db

* beetlex debug plaintext

* change debug docker file

* update beetlex to 1.4.0

* update

* beetlex update core 3.1

* [c#/beetlex] add updates cases

* [c#/beetlex] change Server: TFB, change custom connection pool, add update docker

* fix errors

* change pool init

* change connection pool maxsize

* fix fortunes errors

* clear DBRaw _connectionString value.

* [c#beetlex] change update dbconnection pool size

* [c#/beetlex] udpate spanjson to v3.0.1, Npgsql v5.0.0

* [c#/beetlex] add caching sample

* set connectionstring multiplexing

* remove connection multiplexing setting

* [c#/beetlex]change NpgsqlParameter to  NpgsqlParameter<T>

* [c#/beetlex] update dbraw

* [c#/beetlex] change connection string

* [c#/beetlex]  add fortunes cases to core-updb

* update beetlex 1.5.6

* update 5.0.0-alpha1

* update docker file

* Enabled IOQueues

* Set IOQueues debug mode

* update

* [c#/beetlex] udpate to v1.6.0.1-beta

* update pg drive

* [c#/beetlex] update to beetlex v1.6.3 and support pipelining

* set options
Henry 4 лет назад
Родитель
Сommit
a0a8c04e54

+ 2 - 2
frameworks/CSharp/beetlex/PlatformBenchmarks/Caching.cs

@@ -4,13 +4,13 @@ using SpanJson;
 using System;
 using System.Collections.Generic;
 using System.Text;
-
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class HttpHandler
     {
-        public async void caching(string queryString, PipeStream stream, HttpToken token, ISession session)
+        public async Task caching(string queryString, PipeStream stream, HttpToken token, ISession session)
         {
             int count = 1;
             if (!string.IsNullOrEmpty(queryString))

+ 88 - 30
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpHandler.cs

@@ -49,20 +49,17 @@ namespace PlatformBenchmarks
 
         private static byte _question = 63;
 
-        private NextQueueGroup NextQueueGroup;
-
         public HttpHandler()
         {
-            int threads = System.Math.Min(Environment.ProcessorCount, 16);
-            NextQueueGroup = new NextQueueGroup(threads);
 
         }
 
-        public void Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
+        public Task Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
         {
             stream.Write("<b> beetlex server</b><hr/>");
             stream.Write($"{Encoding.ASCII.GetString(url)} not found!");
             OnCompleted(stream, session, token);
+            return Task.CompletedTask;
         }
 
         public override void Connected(IServer server, ConnectedEventArgs e)
@@ -84,11 +81,82 @@ namespace PlatformBenchmarks
             return -1;
 
         }
-        private void OnProcess(PipeStream pipeStream, HttpToken token, ISession sessino)
+
+        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
+        {
+            base.SessionReceive(server, e);
+            PipeStream pipeStream = e.Session.Stream.ToPipeStream();
+            HttpToken token = (HttpToken)e.Session.Tag;
+            var result = pipeStream.IndexOf(_line.Data);
+            while (result.End != null)
+            {
+                if (result.Length == 2)
+                {
+                    if (token.CurrentRequest != null)
+                    {
+                        token.Requests.Enqueue(token.CurrentRequest);
+                        token.CurrentRequest = null;
+                    }
+                    pipeStream.ReadFree(result.Length);
+                }
+                else
+                {
+                    if (token.CurrentRequest == null)
+                    {
+                        token.CurrentRequest = new RequestData();
+                        var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(result.Length);
+                        pipeStream.Read(buffer, 0, result.Length);
+                        token.CurrentRequest.Data = new ArraySegment<byte>(buffer, 0, result.Length);
+                    }
+                    else
+                    {
+                        pipeStream.ReadFree(result.Length);
+                    }
+                }
+                if (pipeStream.Length > 0)
+                    result = pipeStream.IndexOf(_line.Data);
+                else
+                    break;
+            }
+            if (pipeStream.Length == 0 && token.CurrentRequest == null)
+            {
+                ProcessReqeusts(token, pipeStream, e.Session);
+            }
+        }
+
+        private async Task ProcessReqeusts(HttpToken token, PipeStream pipeStream, ISession session)
+        {
+        PROCESS:
+            if (token.EnterProcess())
+            {
+                while (true)
+                {
+                    if (token.Requests.TryDequeue(out RequestData item))
+                    {
+                        using (item)
+                        {
+                            await OnProcess(item, pipeStream, token, session);
+                        }
+                    }
+                    else
+                    {
+                        break;
+                    }
+                }
+                session.Stream.Flush();
+                token.CompletedProcess();
+                if (!token.Requests.IsEmpty)
+                {
+                    goto PROCESS;
+                }
+            }
+        }
+
+        private Task OnProcess(RequestData requestData, PipeStream pipeStream, HttpToken token, ISession sessino)
         {
             var line = _line.AsSpan();
-            int len = (int)pipeStream.FirstBuffer.Length;
-            var receiveData = pipeStream.FirstBuffer.Memory.Span;
+            int len = requestData.Data.Count;
+            var receiveData = requestData.GetSpan();
             ReadOnlySpan<byte> http = line;
             ReadOnlySpan<byte> method = line;
             ReadOnlySpan<byte> url = line;
@@ -119,21 +187,14 @@ namespace PlatformBenchmarks
                     }
                 }
             }
-            OnStartLine(http, method, url, sessino, token, pipeStream);
-        }
+            return OnStartLine(http, method, url, sessino, token, pipeStream);
 
-        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
-        {
-            base.SessionReceive(server, e);
-            PipeStream pipeStream = e.Session.Stream.ToPipeStream();
-            HttpToken token = (HttpToken)e.Session.Tag;
-            OnProcess(pipeStream, token, e.Session);
         }
 
 
-
-        public virtual void OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
+        public virtual Task OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
         {
+
             int queryIndex = AnalysisUrl(url);
             ReadOnlySpan<byte> baseUrl = default;
             ReadOnlySpan<byte> queryString = default;
@@ -151,51 +212,51 @@ namespace PlatformBenchmarks
             {
                 stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
                 OnWriteContentLength(stream, token);
-                Plaintext(url, stream, token, session);
+                return Plaintext(url, stream, token, session);
             }
             else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
-                Json(url, stream, token, session);
+                return Json(stream, token, session);
             }
             else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
-                db(stream, token, session);
+                return db(stream, token, session);
             }
             else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
-                queries(Encoding.ASCII.GetString(queryString), stream, token, session);
+                return queries(Encoding.ASCII.GetString(queryString), stream, token, session);
             }
 
             else if (baseUrl.Length == _cached_worlds.Length && baseUrl.StartsWith(_cached_worlds))
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
-                caching(Encoding.ASCII.GetString(queryString), stream, token, session);
+                return caching(Encoding.ASCII.GetString(queryString), stream, token, session);
             }
 
             else if (baseUrl.Length == _path_Updates.Length && baseUrl.StartsWith(_path_Updates))
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
-                updates(Encoding.ASCII.GetString(queryString), stream, token, session);
+                return updates(Encoding.ASCII.GetString(queryString), stream, token, session);
             }
             else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
             {
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 OnWriteContentLength(stream, token);
-                fortunes(stream, token, session);
+                return fortunes(stream, token, session);
             }
             else
             {
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 OnWriteContentLength(stream, token);
-                Default(url, stream, token, session);
+                return Default(url, stream, token, session);
             }
         }
 
@@ -219,11 +280,8 @@ namespace PlatformBenchmarks
         {
             stream.ReadFree((int)stream.Length);
             token.FullLength((stream.CacheLength - token.ContentPostion).ToString());
-            session.Stream.Flush();
-        }
-
-
 
+        }
 
     }
 }

+ 21 - 7
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpToken.cs

@@ -1,23 +1,25 @@
 using BeetleX.Buffers;
 using System;
-using System.Collections.Generic;
+using System.Collections.Concurrent;
 using System.Text;
 
 namespace PlatformBenchmarks
 {
     public class HttpToken
     {
-        private byte[] mLengthBuffer = new byte[10];   
+        private byte[] mLengthBuffer = new byte[10];
 
         public RawDb Db { get; set; }
 
-        public NextQueue NextQueue { get; set; }
-
         public HttpToken()
         {
-            
+
         }
 
+        public ConcurrentQueue<RequestData> Requests { get; set; } = new ConcurrentQueue<RequestData>();
+
+        public RequestData CurrentRequest { get; set; }
+
         public byte[] GetLengthBuffer(string length)
         {
             Encoding.ASCII.GetBytes(length, 0, length.Length, mLengthBuffer, 0);
@@ -30,13 +32,25 @@ namespace PlatformBenchmarks
 
         public int ContentPostion { get; set; }
 
-        public  MemoryBlockCollection ContentLength { get; set; }
+        public MemoryBlockCollection ContentLength { get; set; }
 
         public void FullLength(string length)
         {
             var item = GetLengthBuffer(length);
             ContentLength.Full(item);
         }
-        
+
+        private int mProcessStatus = 0;
+
+        public void CompletedProcess()
+        {
+            System.Threading.Interlocked.Exchange(ref mProcessStatus, 0);
+        }
+
+        public bool EnterProcess()
+        {
+            return System.Threading.Interlocked.CompareExchange(ref mProcessStatus, 1, 0) == 0;
+        }
+
     }
 }

+ 0 - 108
frameworks/CSharp/beetlex/PlatformBenchmarks/NextQueue.cs

@@ -1,108 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace PlatformBenchmarks
-{
-    public class NextQueue : IDisposable
-    {
-        public NextQueue()
-        {
-            mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();
-          
-        }
-
-        private readonly object _workSync = new object();
-
-        private bool _doingWork;
-
-        private int mCount;
-
-        private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;
-
-        public int Count => System.Threading.Interlocked.Add(ref mCount, 0);
-
-        public void Enqueue(IEventWork item)
-        {
-            mQueue.Enqueue(item);
-            System.Threading.Interlocked.Increment(ref mCount);
-            lock (_workSync)
-            {
-                if (!_doingWork)
-                {
-                    System.Threading.ThreadPool.QueueUserWorkItem(OnStart);
-                    _doingWork = true;
-                }
-            }
-        }
-
-        private void OnError(Exception e, IEventWork work)
-        {
-            try
-            {
-                Error?.Invoke(e, work);
-            }
-            catch
-            {
-
-            }
-        }
-
-        public static Action<Exception, IEventWork> Error { get; set; }
-
-        private async void OnStart(object state)
-        {
-            while (true)
-            {
-                while (mQueue.TryDequeue(out IEventWork item))
-                {
-                    System.Threading.Interlocked.Decrement(ref mCount);
-                    using (item)
-                    {
-                        try
-                        {
-                            await item.Execute();
-                        }
-                        catch (Exception e_)
-                        {
-                            OnError(e_, item);
-                        }
-                    }
-                }
-                lock (_workSync)
-                {
-                    if (mQueue.IsEmpty)
-                    {
-                        try
-                        {
-                            Unused?.Invoke();
-                        }
-                        catch { }
-                        _doingWork = false;
-                        return;
-                    }
-                }
-            }
-
-        }
-
-        public Action Unused { get; set; }
-
-        public void Dispose()
-        {
-            while (mQueue.TryDequeue(out IEventWork work))
-            {
-                try
-                {
-                    work.Dispose();
-                }
-                catch
-                {
-
-                }
-
-            }
-        }
-    }
-}

+ 0 - 57
frameworks/CSharp/beetlex/PlatformBenchmarks/NextQueueGroup.cs

@@ -1,57 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace PlatformBenchmarks
-{
-    public class NextQueueGroup
-    {
-
-        private List<NextQueue> mQueues = new List<NextQueue>();
-
-        public NextQueueGroup(int count = 0)
-        {
-            if (count == 0)
-            {
-                count = Math.Min(Environment.ProcessorCount, 16);
-            }
-
-            for (int i = 0; i < count; i++)
-                mQueues.Add(new NextQueue());
-        }
-
-        private long mIndex = 0;
-
-        public void Enqueue(IEventWork item, int waitLength = 5)
-        {
-            for (int i = 0; i < mQueues.Count; i++)
-            {
-                if (mQueues[i].Count < waitLength)
-                {
-                    mQueues[i].Enqueue(item);
-                    return;
-                }
-            }
-            Next().Enqueue(item);
-        }
-
-        public NextQueue Next(int waitLength)
-        {
-            for (int i = 0; i < mQueues.Count; i++)
-            {
-                if (mQueues[i].Count < waitLength)
-                {
-                    return mQueues[i];
-
-                }
-            }
-            return Next();
-        }
-
-        public NextQueue Next()
-        {
-            var index = System.Threading.Interlocked.Increment(ref mIndex);
-            return mQueues[(int)(index % mQueues.Count)];
-        }
-    }
-}

+ 5 - 5
frameworks/CSharp/beetlex/PlatformBenchmarks/PlatformBenchmarks.csproj

@@ -8,11 +8,11 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="BeetleX" Version="1.5.6.1114" />
-    <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="5.0.0-rc.1.20451.14" />
-    <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0-rc.1.20451.14" />
-    <PackageReference Include="Npgsql" Version="5.0.0-alpha1" />
-    <PackageReference Include="SpanJson" Version="3.0.1" />
+    <PackageReference Include="BeetleX" Version="1.6.3" />
+    <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="5.0.0" />
+    <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
+    <PackageReference Include="Npgsql" Version="5.0.0" />
+    <PackageReference Include="SpanJson" Version="3.1.0" />
   </ItemGroup>
 
 </Project>

+ 23 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/RequestData.cs

@@ -0,0 +1,23 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace PlatformBenchmarks
+{
+    public class RequestData : IDisposable
+    {
+        public ArraySegment<byte> Data { get; set; }
+
+        public ReadOnlySpan<byte> GetSpan()
+        {
+            return new ReadOnlySpan<byte>(Data.Array, Data.Offset, Data.Count);
+        }
+
+        public void Dispose()
+        {
+            System.Buffers.ArrayPool<byte>.Shared.Return(Data.Array);
+        }
+    }
+}

+ 2 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/db.cs

@@ -4,13 +4,14 @@ using SpanJson;
 using System;
 using System.Collections.Generic;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class HttpHandler
     {
 
-        public async void db(PipeStream stream, HttpToken token, ISession session)
+        public async Task db(PipeStream stream, HttpToken token, ISession session)
         {
             try
             {

+ 2 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/fortunes.cs

@@ -4,6 +4,7 @@ using System;
 using System.Collections.Generic;
 using System.Globalization;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
@@ -16,7 +17,7 @@ namespace PlatformBenchmarks
         private readonly static AsciiString _fortunesRowEnd = "</td></tr>";
         private readonly static AsciiString _fortunesTableEnd = "</table></body></html>";
 
-        public async void fortunes(PipeStream stream, HttpToken token, ISession session)
+        public async Task fortunes(PipeStream stream, HttpToken token, ISession session)
         {
             try
             {

+ 3 - 2
frameworks/CSharp/beetlex/PlatformBenchmarks/json.cs

@@ -4,16 +4,17 @@ using SpanJson;
 using System;
 using System.Collections.Generic;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class HttpHandler
     {
-        public void Json(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
+        public async Task Json(PipeStream stream, HttpToken token, ISession session)
         {
             JsonMessage jsonMessage = default(JsonMessage);
             jsonMessage.message = "Hello, World!";
-            JsonSerializer.NonGeneric.Utf8.SerializeAsync(jsonMessage, stream);
+            await JsonSerializer.NonGeneric.Utf8.SerializeAsync(jsonMessage, stream);
             OnCompleted(stream, session, token);
         }
     }

+ 3 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/plaintext.cs

@@ -3,15 +3,17 @@ using BeetleX.Buffers;
 using System;
 using System.Collections.Generic;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class HttpHandler
     {
-        public void Plaintext(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
+        public Task Plaintext(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
         {
             stream.Write(_result_plaintext.Data, 0, _result_plaintext.Length);
             OnCompleted(stream, session, token);
+            return Task.CompletedTask;
         }
     }
 }

+ 2 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/queries.cs

@@ -4,12 +4,13 @@ using SpanJson;
 using System;
 using System.Collections.Generic;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class  HttpHandler
     {
-        public async void queries(string queryString, PipeStream stream, HttpToken token, ISession session)
+        public async Task queries(string queryString, PipeStream stream, HttpToken token, ISession session)
         {
             int count = 1;
             if(!string.IsNullOrEmpty(queryString))

+ 2 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/updates.cs

@@ -4,12 +4,13 @@ using SpanJson;
 using System;
 using System.Collections.Generic;
 using System.Text;
+using System.Threading.Tasks;
 
 namespace PlatformBenchmarks
 {
     public partial class HttpHandler
     {
-        public async void updates(string queryString, PipeStream stream, HttpToken token, ISession session)
+        public async Task updates(string queryString, PipeStream stream, HttpToken token, ISession session)
         {
             int count = 1;
             if (!string.IsNullOrEmpty(queryString))