Browse Source

beetlex update raw db (#5040)

* docker add COMPlus_ReadyToRun variable
update beetlex

* update beetlex, enabled thread queue

* beetlex framework add db and queries cases

* add db code

* change result json data

* update query url

* beetlex framework add fortunes cases

* change Content-Type

* add beetlex core cases

* fix queries cases

* update config

* change try readline

* update benchmark config

* Update README.md

* Update README.md

* change versus property

* beetlex-core update .net core to v3.0

* change beetlex-core project file

* beetlex update raw db class

* beetlex update raw db
Henry 6 years ago
parent
commit
8f8c559786

+ 38 - 77
frameworks/CSharp/beetlex/Benchmarks/DBRaw.cs

@@ -22,82 +22,37 @@ namespace Benchmarks
             _dbProviderFactory = dbProviderFactory;
             _dbProviderFactory = dbProviderFactory;
             _connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             _connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             //_connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             //_connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
-
-
-            for (int i = 0; i < 256; i++)
-            {
-                DbConnection conn = dbProviderFactory.CreateConnection();
-                conn.ConnectionString = _connectionString;
-                RawDbConnection rawDbConnection = new RawDbConnection(conn, this);
-                mPool.Push(rawDbConnection);
-            }
-        }
-
-
-        private ConcurrentStack<RawDbConnection> mPool = new ConcurrentStack<RawDbConnection>();
-
-
-        private RawDbConnection Pop()
-        {
-            if (mPool.TryPop(out RawDbConnection conn))
-                return conn;
-            else
-                throw new Exception("get raw db connection error!");
+            OnCreateCommand();
         }
         }
 
 
-        private void Push(RawDbConnection conn)
+        private void OnCreateCommand()
         {
         {
-            mPool.Push(conn);
+            SingleCommand = new Npgsql.NpgsqlCommand();
+            SingleCommand.CommandText = "SELECT id, randomnumber FROM world WHERE id = @Id";
+            var id = SingleCommand.CreateParameter();
+            id.ParameterName = "@Id";
+            id.DbType = DbType.Int32;
+            id.Value = _random.Next(1, 10001);
+            SingleCommand.Parameters.Add(id);
+            FortuneCommand = new Npgsql.NpgsqlCommand();
+            FortuneCommand.CommandText = "SELECT id, message FROM fortune";
         }
         }
 
 
+        private DbCommand SingleCommand;
 
 
-        class RawDbConnection : IDisposable
-        {
-            public RawDbConnection(DbConnection connection, RawDb rawdb)
-            {
-                Connection = connection;
-                Connection.Open();
-
-                var cmd = connection.CreateCommand();
-                cmd.CommandText = "SELECT id, randomnumber FROM world WHERE id = @Id";
-                var id = cmd.CreateParameter();
-                id.ParameterName = "@Id";
-                id.DbType = DbType.Int32;
-                id.Value = 0;
-                cmd.Parameters.Add(id);
-                ReadCommand = cmd;
-
-                cmd = connection.CreateCommand();
-                cmd.CommandText = "SELECT id, message FROM fortune";
-                FortuneCommand = cmd;
-
-                DbHandler = rawdb;
-
-            }
-
-            public DbConnection Connection { get; private set; }
-
-            public DbCommand ReadCommand { get; private set; }
-
-            public DbCommand FortuneCommand { get; private set; }
-
-            public RawDb DbHandler { get; private set; }
-
-            public void Dispose()
-            {
-                DbHandler.Push(this);
-            }
-        }
+        private DbCommand FortuneCommand;
 
 
         public async Task<World> LoadSingleQueryRow()
         public async Task<World> LoadSingleQueryRow()
         {
         {
-            using (var conn = Pop())
+            using (var db = _dbProviderFactory.CreateConnection())
             {
             {
-                var cmd = conn.ReadCommand;
-                cmd.Parameters[0].Value = _random.Next(1, 10001);
-                return await ReadSingleRow(conn.Connection, cmd);
-            }
+                db.ConnectionString = _connectionString;
+                await db.OpenAsync();
+                SingleCommand.Connection = db;
+                SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
+                return await ReadSingleRow(db, SingleCommand);
 
 
+            }
         }
         }
 
 
         async Task<World> ReadSingleRow(DbConnection connection, DbCommand cmd)
         async Task<World> ReadSingleRow(DbConnection connection, DbCommand cmd)
@@ -114,36 +69,42 @@ namespace Benchmarks
             }
             }
         }
         }
 
 
-
         public async Task<World[]> LoadMultipleQueriesRows(int count)
         public async Task<World[]> LoadMultipleQueriesRows(int count)
         {
         {
-            using (var conn = Pop())
+            using (var db = _dbProviderFactory.CreateConnection())
             {
             {
-                var cmd = conn.ReadCommand;
-                cmd.Parameters[0].Value = _random.Next(1, 10001);
-                return await LoadMultipleRows(count, conn.Connection, conn.ReadCommand);
+                db.ConnectionString = _connectionString;
+                await db.OpenAsync();
+                return await LoadMultipleRows(count, db);
             }
             }
+
         }
         }
 
 
-        private async Task<World[]> LoadMultipleRows(int count, DbConnection db, DbCommand cmd)
+        private async Task<World[]> LoadMultipleRows(int count, DbConnection db)
         {
         {
-            cmd.Parameters[0].Value = _random.Next(1, 10001);
+            SingleCommand.Connection = db;
+            SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
             var result = new World[count];
             var result = new World[count];
             for (int i = 0; i < result.Length; i++)
             for (int i = 0; i < result.Length; i++)
             {
             {
-                result[i] = await ReadSingleRow(db, cmd);
-                cmd.Parameters[0].Value = _random.Next(1, 10001);
+                result[i] = await ReadSingleRow(db, SingleCommand);
+                SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
             }
             }
             return result;
             return result;
+
         }
         }
 
 
         public async Task<List<Fortune>> LoadFortunesRows()
         public async Task<List<Fortune>> LoadFortunesRows()
         {
         {
             var result = new List<Fortune>();
             var result = new List<Fortune>();
-            using (var conn = Pop())
+
+            using (var db = _dbProviderFactory.CreateConnection())
+
             {
             {
-                var cmd = conn.FortuneCommand;
-                using (var rdr = await cmd.ExecuteReaderAsync(CommandBehavior.Default))
+                db.ConnectionString = _connectionString;
+                await db.OpenAsync();
+                FortuneCommand.Connection = db;
+                using (var rdr = await FortuneCommand.ExecuteReaderAsync(CommandBehavior.CloseConnection))
                 {
                 {
                     while (await rdr.ReadAsync())
                     while (await rdr.ReadAsync())
                     {
                     {

+ 17 - 9
frameworks/CSharp/beetlex/Benchmarks/Program.cs

@@ -12,7 +12,7 @@ using System.Collections.Generic;
 namespace Benchmarks
 namespace Benchmarks
 {
 {
     [Controller]
     [Controller]
-    class Program:IController
+    class Program : IController
     {
     {
         public static void Main(string[] args)
         public static void Main(string[] args)
         {
         {
@@ -34,32 +34,37 @@ namespace Benchmarks
             return new SpanJsonResult(new JsonMessage { message = "Hello, World!" });
             return new SpanJsonResult(new JsonMessage { message = "Hello, World!" });
         }
         }
 
 
-        public async Task<object> queries(int queries)
+        public async Task<object> queries(int queries, IHttpContext context)
         {
         {
             queries = queries < 1 ? 1 : queries > 500 ? 500 : queries;
             queries = queries < 1 ? 1 : queries > 500 ? 500 : queries;
-            var result = await mPgsql.LoadMultipleQueriesRows(queries);
+            var result = await GetDB(context).LoadMultipleQueriesRows(queries);
             return new SpanJsonResult(result);
             return new SpanJsonResult(result);
         }
         }
 
 
-        public async Task<object> db()
+        public RawDb GetDB(IHttpContext context)
         {
         {
-            var result = await mPgsql.LoadSingleQueryRow();
+            return (RawDb)context.Session["DB"];
+        }
+
+        public async Task<object> db(IHttpContext context)
+        {
+            var result = await GetDB(context).LoadSingleQueryRow();
             return new SpanJsonResult(result);
             return new SpanJsonResult(result);
         }
         }
 
 
-        public async Task<object> fortunes()
+        public async Task<object> fortunes(IHttpContext context)
         {
         {
-            var data = await mPgsql.LoadFortunesRows();
+            var data = await GetDB(context).LoadFortunesRows();
             return new FortuneView(data);
             return new FortuneView(data);
         }
         }
 
 
 
 
-        private RawDb mPgsql;
+      
 
 
         [NotAction]
         [NotAction]
         public void Init(HttpApiServer server, string path)
         public void Init(HttpApiServer server, string path)
         {
         {
-            mPgsql = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
+           
         }
         }
     }
     }
 
 
@@ -85,6 +90,9 @@ namespace Benchmarks
             mApiServer.Options.LogToConsole = true;
             mApiServer.Options.LogToConsole = true;
             mApiServer.Options.PrivateBufferPool = true;
             mApiServer.Options.PrivateBufferPool = true;
             mApiServer.Register(typeof(Program).Assembly);
             mApiServer.Register(typeof(Program).Assembly);
+            mApiServer.HttpConnected += (o, e) => {
+                e.Session["DB"] = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
+            };
             mApiServer.Open();
             mApiServer.Open();
             return Task.CompletedTask;
             return Task.CompletedTask;
         }
         }

+ 33 - 33
frameworks/CSharp/beetlex/PlatformBenchmarks/DBRaw.cs

@@ -9,7 +9,7 @@ namespace PlatformBenchmarks
 {
 {
     public class RawDb
     public class RawDb
     {
     {
-    
+
         private readonly ConcurrentRandom _random;
         private readonly ConcurrentRandom _random;
 
 
         private readonly DbProviderFactory _dbProviderFactory;
         private readonly DbProviderFactory _dbProviderFactory;
@@ -22,19 +22,36 @@ namespace PlatformBenchmarks
             _dbProviderFactory = dbProviderFactory;
             _dbProviderFactory = dbProviderFactory;
             _connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             _connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             //_connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
             //_connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=256;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
+            OnCreateCommand();
+        }
+
+        private void OnCreateCommand()
+        {
+            SingleCommand = new Npgsql.NpgsqlCommand();
+            SingleCommand.CommandText = "SELECT id, randomnumber FROM world WHERE id = @Id";
+            var id = SingleCommand.CreateParameter();
+            id.ParameterName = "@Id";
+            id.DbType = DbType.Int32;
+            id.Value = _random.Next(1, 10001);
+            SingleCommand.Parameters.Add(id);
+            FortuneCommand = new Npgsql.NpgsqlCommand();
+            FortuneCommand.CommandText= "SELECT id, message FROM fortune";
         }
         }
 
 
+        private DbCommand SingleCommand;
+
+        private DbCommand FortuneCommand;
+
         public async Task<World> LoadSingleQueryRow()
         public async Task<World> LoadSingleQueryRow()
         {
         {
             using (var db = _dbProviderFactory.CreateConnection())
             using (var db = _dbProviderFactory.CreateConnection())
             {
             {
                 db.ConnectionString = _connectionString;
                 db.ConnectionString = _connectionString;
                 await db.OpenAsync();
                 await db.OpenAsync();
+                SingleCommand.Connection = db;
+                SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
+                return await ReadSingleRow(db, SingleCommand);
 
 
-                using (var cmd = CreateReadCommand(db))
-                {
-                    return await ReadSingleRow(db, cmd);
-                }
             }
             }
         }
         }
 
 
@@ -52,19 +69,6 @@ namespace PlatformBenchmarks
             }
             }
         }
         }
 
 
-        DbCommand CreateReadCommand(DbConnection connection)
-        {
-            var cmd = connection.CreateCommand();
-            cmd.CommandText = "SELECT id, randomnumber FROM world WHERE id = @Id";
-            var id = cmd.CreateParameter();
-            id.ParameterName = "@Id";
-            id.DbType = DbType.Int32;
-            id.Value = _random.Next(1, 10001);
-            cmd.Parameters.Add(id);
-            return cmd;
-        }
-
-
         public async Task<World[]> LoadMultipleQueriesRows(int count)
         public async Task<World[]> LoadMultipleQueriesRows(int count)
         {
         {
             using (var db = _dbProviderFactory.CreateConnection())
             using (var db = _dbProviderFactory.CreateConnection())
@@ -78,18 +82,16 @@ namespace PlatformBenchmarks
 
 
         private async Task<World[]> LoadMultipleRows(int count, DbConnection db)
         private async Task<World[]> LoadMultipleRows(int count, DbConnection db)
         {
         {
-            using (var cmd = CreateReadCommand(db))
+            SingleCommand.Connection = db;
+            SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
+            var result = new World[count];
+            for (int i = 0; i < result.Length; i++)
             {
             {
-                cmd.Parameters["@Id"].Value = _random.Next(1, 10001);
-
-                var result = new World[count];
-                for (int i = 0; i < result.Length; i++)
-                {
-                    result[i] = await ReadSingleRow(db, cmd);
-                    cmd.Parameters["@Id"].Value = _random.Next(1, 10001);
-                }
-                return result;
+                result[i] = await ReadSingleRow(db, SingleCommand);
+                SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
             }
             }
+            return result;
+
         }
         }
 
 
         public async Task<List<Fortune>> LoadFortunesRows()
         public async Task<List<Fortune>> LoadFortunesRows()
@@ -97,13 +99,12 @@ namespace PlatformBenchmarks
             var result = new List<Fortune>();
             var result = new List<Fortune>();
 
 
             using (var db = _dbProviderFactory.CreateConnection())
             using (var db = _dbProviderFactory.CreateConnection())
-            using (var cmd = db.CreateCommand())
-            {
-                cmd.CommandText = "SELECT id, message FROM fortune";
 
 
+            { 
                 db.ConnectionString = _connectionString;
                 db.ConnectionString = _connectionString;
                 await db.OpenAsync();
                 await db.OpenAsync();
-                using (var rdr = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection))
+                FortuneCommand.Connection = db;
+                using (var rdr = await FortuneCommand.ExecuteReaderAsync(CommandBehavior.CloseConnection))
                 {
                 {
                     while (await rdr.ReadAsync())
                     while (await rdr.ReadAsync())
                     {
                     {
@@ -115,7 +116,6 @@ namespace PlatformBenchmarks
                     }
                     }
                 }
                 }
             }
             }
-
             result.Add(new Fortune { Message = "Additional fortune added at request time." });
             result.Add(new Fortune { Message = "Additional fortune added at request time." });
             result.Sort();
             result.Sort();
             return result;
             return result;

+ 63 - 20
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpHandler.cs

@@ -5,6 +5,7 @@ using SpanJson;
 using System;
 using System;
 using System.Collections.Generic;
 using System.Collections.Generic;
 using System.Text;
 using System.Text;
+using System.Threading.Tasks;
 
 
 namespace PlatformBenchmarks
 namespace PlatformBenchmarks
 {
 {
@@ -44,12 +45,15 @@ namespace PlatformBenchmarks
 
 
         private static byte _question = 63;
         private static byte _question = 63;
 
 
-        private RawDb mPgsql;
+        private NextQueueGroup NextQueueGroup;
 
 
         public HttpHandler()
         public HttpHandler()
         {
         {
-
-            mPgsql = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
+            int threads = System.Math.Min(Environment.ProcessorCount, 16);
+            if (Environment.ProcessorCount > 20)
+                threads = Environment.ProcessorCount * 2 / 3;
+            NextQueueGroup = new NextQueueGroup(2);
+          
         }
         }
 
 
         public void Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
         public void Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
@@ -62,7 +66,11 @@ namespace PlatformBenchmarks
         public override void Connected(IServer server, ConnectedEventArgs e)
         public override void Connected(IServer server, ConnectedEventArgs e)
         {
         {
             base.Connected(server, e);
             base.Connected(server, e);
-            e.Session.Tag = new HttpToken();
+            e.Session.Socket.NoDelay = true;
+            var token = new HttpToken();
+            token.Db = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
+            token.NextQueue = NextQueueGroup.Next();
+            e.Session.Tag = token;
         }
         }
 
 
         private int AnalysisUrl(ReadOnlySpan<byte> url)
         private int AnalysisUrl(ReadOnlySpan<byte> url)
@@ -76,13 +84,33 @@ namespace PlatformBenchmarks
 
 
         }
         }
 
 
-        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
+        class RequestWork : IEventWork
         {
         {
-            base.SessionReceive(server, e);
-            PipeStream pipeStream = e.Session.Stream.ToPipeStream();
-            HttpToken token = (HttpToken)e.Session.Tag;
+            public void Dispose()
+            {
+               
+            }
+
+            public PipeStream Stream { get; set; }
+
+            public ISession Session { get; set; }
+
+            public HttpToken Token { get; set; }
+
+            public HttpHandler Handler { get; set; }
+
+            public Task Execute()
+            {
+                Handler.OnProcess(Stream, Token, Session);
+                return Task.CompletedTask;
+            }
+        }
+
+        private void OnProcess(PipeStream pipeStream,HttpToken token,ISession sessino)
+        {
+
             var result = pipeStream.IndexOf(_line.Data);
             var result = pipeStream.IndexOf(_line.Data);
-            if(result.End ==null)
+            if (result.End == null)
             {
             {
                 return;
                 return;
             }
             }
@@ -110,51 +138,66 @@ namespace PlatformBenchmarks
                     count++;
                     count++;
                 }
                 }
             }
             }
-            OnStartLine(http, method, url, e.Session, token, pipeStream);
+            OnStartLine(http, method, url, sessino, token, pipeStream);
+        }
+
+        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
+        {
+            base.SessionReceive(server, e);
+            PipeStream pipeStream = e.Session.Stream.ToPipeStream();
+            HttpToken token = (HttpToken)e.Session.Tag;
+            
+            //RequestWork work = new RequestWork();
+            //work.Handler = this;
+            //work.Session = e.Session;
+            //work.Stream = pipeStream;
+            //work.Token = token;
+            //token.NextQueue.Enqueue(work);
+            OnProcess(pipeStream, token, e.Session);
         }
         }
 
 
 
 
 
 
-        protected virtual void OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
+        public virtual void OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
         {
         {
             int queryIndex = AnalysisUrl(url);
             int queryIndex = AnalysisUrl(url);
-            ReadOnlySpan<byte> baseUrl=default;
+            ReadOnlySpan<byte> baseUrl = default;
             ReadOnlySpan<byte> queryString = default;
             ReadOnlySpan<byte> queryString = default;
-            if (queryIndex>0)
+            if (queryIndex > 0)
             {
             {
                 baseUrl = url.Slice(0, queryIndex);
                 baseUrl = url.Slice(0, queryIndex);
-                queryString = url.Slice(queryIndex+1, url.Length - queryIndex - 1);
+                queryString = url.Slice(queryIndex + 1, url.Length - queryIndex - 1);
             }
             }
             else
             else
             {
             {
                 baseUrl = url;
                 baseUrl = url;
             }
             }
-            OnWriteHeader(stream, token);         
-            if (baseUrl.Length== _path_Plaintext.Length && baseUrl.StartsWith(_path_Plaintext))
+            OnWriteHeader(stream, token);
+            if (baseUrl.Length == _path_Plaintext.Length && baseUrl.StartsWith(_path_Plaintext))
             {
             {
                 stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
                 stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
                 Plaintext(url, stream, token, session);
                 Plaintext(url, stream, token, session);
             }
             }
-            else if (baseUrl.Length==_path_Json.Length && baseUrl.StartsWith(_path_Json))
+            else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
                 Json(url, stream, token, session);
                 Json(url, stream, token, session);
             }
             }
-            else if(baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
+            else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
                 db(stream, token, session);
                 db(stream, token, session);
             }
             }
-            else if(baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
+            else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
             {
             {
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);
                 queries(Encoding.ASCII.GetString(queryString), stream, token, session);
                 queries(Encoding.ASCII.GetString(queryString), stream, token, session);
             }
             }
-            else if(baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
+            else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
             {
             {
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
                 OnWriteContentLength(stream, token);
                 OnWriteContentLength(stream, token);

+ 4 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/HttpToken.cs

@@ -15,6 +15,10 @@ namespace PlatformBenchmarks
             private set;
             private set;
         }       
         }       
 
 
+        public RawDb Db { get; set; }
+
+        public NextQueue NextQueue { get; set; }
+
         public HttpToken()
         public HttpToken()
         {
         {
             Buffer = new byte[2048];
             Buffer = new byte[2048];

+ 10 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/IEventWork.cs

@@ -0,0 +1,10 @@
+using System;
+using System.Threading.Tasks;
+
+namespace PlatformBenchmarks
+{
+    public interface IEventWork : IDisposable
+    {
+        Task Execute();
+    }
+}

+ 108 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/NextQueue.cs

@@ -0,0 +1,108 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace PlatformBenchmarks
+{
+    public class NextQueue : IDisposable
+    {
+        public NextQueue()
+        {
+            mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();
+          
+        }
+
+        private readonly object _workSync = new object();
+
+        private bool _doingWork;
+
+        private int mCount;
+
+        private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;
+
+        public int Count => System.Threading.Interlocked.Add(ref mCount, 0);
+
+        public void Enqueue(IEventWork item)
+        {
+            mQueue.Enqueue(item);
+            System.Threading.Interlocked.Increment(ref mCount);
+            lock (_workSync)
+            {
+                if (!_doingWork)
+                {
+                    System.Threading.ThreadPool.QueueUserWorkItem(OnStart);
+                    _doingWork = true;
+                }
+            }
+        }
+
+        private void OnError(Exception e, IEventWork work)
+        {
+            try
+            {
+                Error?.Invoke(e, work);
+            }
+            catch
+            {
+
+            }
+        }
+
+        public static Action<Exception, IEventWork> Error { get; set; }
+
+        private async void OnStart(object state)
+        {
+            while (true)
+            {
+                while (mQueue.TryDequeue(out IEventWork item))
+                {
+                    System.Threading.Interlocked.Decrement(ref mCount);
+                    using (item)
+                    {
+                        try
+                        {
+                            await item.Execute();
+                        }
+                        catch (Exception e_)
+                        {
+                            OnError(e_, item);
+                        }
+                    }
+                }
+                lock (_workSync)
+                {
+                    if (mQueue.IsEmpty)
+                    {
+                        try
+                        {
+                            Unused?.Invoke();
+                        }
+                        catch { }
+                        _doingWork = false;
+                        return;
+                    }
+                }
+            }
+
+        }
+
+        public Action Unused { get; set; }
+
+        public void Dispose()
+        {
+            while (mQueue.TryDequeue(out IEventWork work))
+            {
+                try
+                {
+                    work.Dispose();
+                }
+                catch
+                {
+
+                }
+
+            }
+        }
+    }
+}

+ 57 - 0
frameworks/CSharp/beetlex/PlatformBenchmarks/NextQueueGroup.cs

@@ -0,0 +1,57 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace PlatformBenchmarks
+{
+    public class NextQueueGroup
+    {
+
+        private List<NextQueue> mQueues = new List<NextQueue>();
+
+        public NextQueueGroup(int count = 0)
+        {
+            if (count == 0)
+            {
+                count = Math.Min(Environment.ProcessorCount, 16);
+            }
+
+            for (int i = 0; i < count; i++)
+                mQueues.Add(new NextQueue());
+        }
+
+        private long mIndex = 0;
+
+        public void Enqueue(IEventWork item, int waitLength = 5)
+        {
+            for (int i = 0; i < mQueues.Count; i++)
+            {
+                if (mQueues[i].Count < waitLength)
+                {
+                    mQueues[i].Enqueue(item);
+                    return;
+                }
+            }
+            Next().Enqueue(item);
+        }
+
+        public NextQueue Next(int waitLength)
+        {
+            for (int i = 0; i < mQueues.Count; i++)
+            {
+                if (mQueues[i].Count < waitLength)
+                {
+                    return mQueues[i];
+
+                }
+            }
+            return Next();
+        }
+
+        public NextQueue Next()
+        {
+            var index = System.Threading.Interlocked.Increment(ref mIndex);
+            return mQueues[(int)(index % mQueues.Count)];
+        }
+    }
+}

+ 1 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/db.cs

@@ -14,7 +14,7 @@ namespace PlatformBenchmarks
         {
         {
             try
             try
             {
             {
-                var data = await mPgsql.LoadSingleQueryRow();
+                var data = await token.Db.LoadSingleQueryRow();
                 await JsonSerializer.NonGeneric.Utf8.SerializeAsync(data, stream);
                 await JsonSerializer.NonGeneric.Utf8.SerializeAsync(data, stream);
             }
             }
             catch(Exception e_)
             catch(Exception e_)

+ 1 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/fortunes.cs

@@ -20,7 +20,7 @@ namespace PlatformBenchmarks
         {
         {
             try
             try
             {
             {
-                var data = await mPgsql.LoadFortunesRows();
+                var data = await token.Db.LoadFortunesRows();
                 stream.Write(_fortunesTableStart.Data, 0, _fortunesTableStart.Length);
                 stream.Write(_fortunesTableStart.Data, 0, _fortunesTableStart.Length);
                 foreach (var item in data)
                 foreach (var item in data)
                 {
                 {

+ 1 - 1
frameworks/CSharp/beetlex/PlatformBenchmarks/queries.cs

@@ -29,7 +29,7 @@ namespace PlatformBenchmarks
                 count = 1;
                 count = 1;
             try
             try
             {
             {
-                var data = await mPgsql.LoadMultipleQueriesRows(count);
+                var data = await token.Db.LoadMultipleQueriesRows(count);
                 await JsonSerializer.NonGeneric.Utf8.SerializeAsync(data, stream);
                 await JsonSerializer.NonGeneric.Utf8.SerializeAsync(data, stream);
             }
             }
             catch (Exception e_)
             catch (Exception e_)