DBRaw.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Collections.Concurrent;
  4. using System.Data;
  5. using System.Data.Common;
  6. using System.Linq;
  7. using System.Runtime.Versioning;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using System.Runtime.InteropServices.ComTypes;
  11. using BeetleX.EventArgs;
  12. namespace PlatformBenchmarks
  13. {
  14. public class RawDb
  15. {
  16. private readonly ConcurrentRandom _random;
  17. private readonly DbProviderFactory _dbProviderFactory;
  18. public static string _connectionString = null;
  19. public RawDb(ConcurrentRandom random, DbProviderFactory dbProviderFactory)
  20. {
  21. _random = random;
  22. _dbProviderFactory = dbProviderFactory;
  23. OnCreateCommand();
  24. }
  25. private void OnCreateCommand()
  26. {
  27. SingleCommand = new Npgsql.NpgsqlCommand();
  28. SingleCommand.CommandText = "SELECT id, randomnumber FROM world WHERE id = @Id";
  29. var id = SingleCommand.CreateParameter();
  30. id.ParameterName = "@Id";
  31. id.DbType = DbType.Int32;
  32. id.Value = _random.Next(1, 10001);
  33. SingleCommand.Parameters.Add(id);
  34. FortuneCommand = new Npgsql.NpgsqlCommand();
  35. FortuneCommand.CommandText = "SELECT id, message FROM fortune";
  36. }
  37. private DbCommand SingleCommand;
  38. private DbCommand FortuneCommand;
  39. public async Task<World> LoadSingleQueryRow()
  40. {
  41. using (var db = await DBConnectionGroupPool.Pop())
  42. {
  43. SingleCommand.Connection = db.Connection;
  44. SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
  45. return await ReadSingleRow(db.Connection, SingleCommand);
  46. }
  47. }
  48. async Task<World> ReadSingleRow(DbConnection connection, DbCommand cmd)
  49. {
  50. using (var rdr = await cmd.ExecuteReaderAsync(CommandBehavior.SingleRow))
  51. {
  52. await rdr.ReadAsync();
  53. return new World
  54. {
  55. Id = rdr.GetInt32(0),
  56. RandomNumber = rdr.GetInt32(1)
  57. };
  58. }
  59. }
  60. public async Task<World[]> LoadMultipleQueriesRows(int count)
  61. {
  62. using (var db = await DBConnectionGroupPool.Pop())
  63. {
  64. return await LoadMultipleRows(count, db.Connection);
  65. }
  66. }
  67. private async Task<World[]> LoadMultipleRows(int count, DbConnection db)
  68. {
  69. SingleCommand.Connection = db;
  70. SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
  71. var result = new World[count];
  72. for (int i = 0; i < result.Length; i++)
  73. {
  74. result[i] = await ReadSingleRow(db, SingleCommand);
  75. SingleCommand.Parameters[0].Value = _random.Next(1, 10001);
  76. }
  77. return result;
  78. }
  79. public async Task<List<Fortune>> LoadFortunesRows()
  80. {
  81. var result = new List<Fortune>();
  82. using (var db = await DBConnectionGroupPool.Pop())
  83. {
  84. FortuneCommand.Connection = db.Connection;
  85. using (var rdr = await FortuneCommand.ExecuteReaderAsync(CommandBehavior.Default))
  86. {
  87. while (await rdr.ReadAsync())
  88. {
  89. result.Add(new Fortune
  90. {
  91. Id = rdr.GetInt32(0),
  92. Message = rdr.GetString(1)
  93. });
  94. }
  95. }
  96. }
  97. result.Add(new Fortune { Message = "Additional fortune added at request time." });
  98. result.Sort();
  99. return result;
  100. }
  101. public async Task<World[]> LoadMultipleUpdatesRows(int count)
  102. {
  103. using (var db = await DBConnectionGroupPool.Pop())
  104. {
  105. var updateCmd = UpdateCommandsCached.PopCommand(count);
  106. try
  107. {
  108. updateCmd.Connection = db.Connection;
  109. SingleCommand.Connection = db.Connection;
  110. SingleCommand.Parameters[0].Value = _random.Next(1, int.MaxValue) % 10000 + 1;
  111. var results = new World[count];
  112. for (int i = 0; i < count; i++)
  113. {
  114. results[i] = await ReadSingleRow(db.Connection, SingleCommand);
  115. SingleCommand.Parameters[0].Value = _random.Next(1, int.MaxValue) % 10000 + 1;
  116. }
  117. for (int i = 0; i < count; i++)
  118. {
  119. var randomNumber = _random.Next(1, int.MaxValue) % 10000 + 1;
  120. updateCmd.Parameters[i * 2].Value = results[i].Id;
  121. updateCmd.Parameters[i * 2 + 1].Value = randomNumber;
  122. results[i].RandomNumber = randomNumber;
  123. }
  124. await updateCmd.ExecuteNonQueryAsync();
  125. return results;
  126. }
  127. catch (Exception e_)
  128. {
  129. throw e_;
  130. }
  131. finally
  132. {
  133. UpdateCommandsCached.PushCommand(count, updateCmd);
  134. }
  135. }
  136. }
  137. }
  138. internal class UpdateCommandsCached
  139. {
  140. private static System.Collections.Concurrent.ConcurrentStack<DbCommand>[] mCacheTable
  141. = new System.Collections.Concurrent.ConcurrentStack<DbCommand>[1024];
  142. public static string[] IDParamereNames = new string[1024];
  143. public static string[] RandomParamereNames = new string[1024];
  144. static UpdateCommandsCached()
  145. {
  146. for (int i = 0; i < 1024; i++)
  147. {
  148. IDParamereNames[i] = $"@Id_{i}";
  149. RandomParamereNames[i] = $"@Random_{i}";
  150. mCacheTable[i] = new System.Collections.Concurrent.ConcurrentStack<DbCommand>();
  151. }
  152. }
  153. private static DbCommand CreatCommand(int count)
  154. {
  155. DbCommand cmd = new Npgsql.NpgsqlCommand();
  156. cmd.CommandText = BatchUpdateString.Query(count);
  157. for (int i = 0; i < count; i++)
  158. {
  159. var id = cmd.CreateParameter();
  160. id.ParameterName = IDParamereNames[i];
  161. id.DbType = DbType.Int32;
  162. cmd.Parameters.Add(id);
  163. var random = cmd.CreateParameter();
  164. random.ParameterName = RandomParamereNames[i];
  165. random.DbType = DbType.Int32;
  166. cmd.Parameters.Add(random);
  167. }
  168. return cmd;
  169. }
  170. public static void PushCommand(int count, DbCommand cmd)
  171. {
  172. mCacheTable[count].Push(cmd);
  173. }
  174. public static DbCommand PopCommand(int count)
  175. {
  176. if (mCacheTable[count].TryPop(out DbCommand cmd))
  177. return cmd;
  178. return CreatCommand(count);
  179. }
  180. private static bool mInited = false;
  181. public static void Init()
  182. {
  183. if (mInited)
  184. return;
  185. lock (typeof(UpdateCommandsCached))
  186. {
  187. if (mInited)
  188. return;
  189. for (int i = 1; i <= 500; i++)
  190. {
  191. for (int k = 0; k < 10; k++)
  192. {
  193. var cmd = CreatCommand(i);
  194. mCacheTable[i].Push(cmd);
  195. }
  196. }
  197. mInited = true;
  198. HttpServer.ApiServer.Log(LogType.Info, null, $"Init update commands cached");
  199. return;
  200. }
  201. }
  202. }
  203. internal class BatchUpdateString
  204. {
  205. private const int MaxBatch = 500;
  206. private static string[] _queries = new string[MaxBatch + 1];
  207. public static string Query(int batchSize)
  208. {
  209. if (_queries[batchSize] != null)
  210. {
  211. return _queries[batchSize];
  212. }
  213. var lastIndex = batchSize - 1;
  214. var sb = StringBuilderCache.Acquire();
  215. sb.Append("UPDATE world SET randomNumber = temp.randomNumber FROM (VALUES ");
  216. Enumerable.Range(0, lastIndex).ToList().ForEach(i => sb.Append($"(@Id_{i}, @Random_{i}), "));
  217. sb.Append($"(@Id_{lastIndex}, @Random_{lastIndex}) ORDER BY 1) AS temp(id, randomNumber) WHERE temp.id = world.id");
  218. return _queries[batchSize] = StringBuilderCache.GetStringAndRelease(sb);
  219. }
  220. }
  221. internal static class StringBuilderCache
  222. {
  223. private const int DefaultCapacity = 1386;
  224. private const int MaxBuilderSize = DefaultCapacity * 3;
  225. [ThreadStatic]
  226. private static StringBuilder t_cachedInstance;
  227. /// <summary>Get a StringBuilder for the specified capacity.</summary>
  228. /// <remarks>If a StringBuilder of an appropriate size is cached, it will be returned and the cache emptied.</remarks>
  229. public static StringBuilder Acquire(int capacity = DefaultCapacity)
  230. {
  231. if (capacity <= MaxBuilderSize)
  232. {
  233. StringBuilder sb = t_cachedInstance;
  234. if (capacity < DefaultCapacity)
  235. {
  236. capacity = DefaultCapacity;
  237. }
  238. if (sb != null)
  239. {
  240. // Avoid stringbuilder block fragmentation by getting a new StringBuilder
  241. // when the requested size is larger than the current capacity
  242. if (capacity <= sb.Capacity)
  243. {
  244. t_cachedInstance = null;
  245. sb.Clear();
  246. return sb;
  247. }
  248. }
  249. }
  250. return new StringBuilder(capacity);
  251. }
  252. public static void Release(StringBuilder sb)
  253. {
  254. if (sb.Capacity <= MaxBuilderSize)
  255. {
  256. t_cachedInstance = sb;
  257. }
  258. }
  259. public static string GetStringAndRelease(StringBuilder sb)
  260. {
  261. string result = sb.ToString();
  262. Release(sb);
  263. return result;
  264. }
  265. }
  266. internal class DBConnectionGroupPool
  267. {
  268. private static long mIndex;
  269. private static List<DBconnectionPool> mPools = new List<DBconnectionPool>();
  270. private static bool mInited = false;
  271. public static void Init(int max, string connectionstring)
  272. {
  273. if (mInited)
  274. return;
  275. lock (typeof(DBconnectionPool))
  276. {
  277. if (mInited)
  278. return;
  279. int group = 2;
  280. if (!Program.UpDB)
  281. group = 16;
  282. else
  283. group = 4;
  284. HttpServer.ApiServer.Log(BeetleX.EventArgs.LogType.Info, null, $"connection pool init {max} group {group}");
  285. int itemcount = (max / group);
  286. for (int i = 0; i < group; i++)
  287. {
  288. DBconnectionPool pool = new DBconnectionPool();
  289. pool.Init(itemcount, connectionstring);
  290. mPools.Add(pool);
  291. }
  292. HttpServer.ApiServer.Log(LogType.Info, null, $"Init connection pool completed");
  293. mInited = true;
  294. return;
  295. }
  296. }
  297. public static Task<DBConnectionItem> Pop()
  298. {
  299. long id = System.Threading.Interlocked.Increment(ref mIndex);
  300. return mPools[(int)(id % mPools.Count)].Pop();
  301. }
  302. public class DBconnectionPool
  303. {
  304. private Stack<DBConnectionItem> mConnectionPool = new Stack<DBConnectionItem>();
  305. private Queue<TaskCompletionSource<DBConnectionItem>> mWaitQueue = new Queue<TaskCompletionSource<DBConnectionItem>>();
  306. public void Init(int count, string connectionString)
  307. {
  308. for (int i = 0; i < count; i++)
  309. {
  310. DbConnection connection = Npgsql.NpgsqlFactory.Instance.CreateConnection();
  311. connection.ConnectionString = connectionString;
  312. connection.Open();
  313. DBConnectionItem item = new DBConnectionItem();
  314. item.Pool = this;
  315. item.Connection = connection;
  316. mConnectionPool.Push(item);
  317. }
  318. }
  319. public Task<DBConnectionItem> Pop()
  320. {
  321. lock (this)
  322. {
  323. if (mConnectionPool.Count > 0)
  324. return Task.FromResult(mConnectionPool.Pop());
  325. TaskCompletionSource<DBConnectionItem> result = new TaskCompletionSource<DBConnectionItem>();
  326. mWaitQueue.Enqueue(result);
  327. return result.Task;
  328. }
  329. }
  330. public void Push(DBConnectionItem item)
  331. {
  332. TaskCompletionSource<DBConnectionItem> work = null;
  333. lock (this)
  334. {
  335. if (mWaitQueue.Count > 0)
  336. work = mWaitQueue.Dequeue();
  337. else
  338. mConnectionPool.Push(item);
  339. }
  340. if (work != null)
  341. {
  342. Task.Run(() => work.SetResult(item));
  343. }
  344. }
  345. }
  346. public class DBConnectionItem : IDisposable
  347. {
  348. public DBconnectionPool Pool { get; set; }
  349. public DbConnection Connection { get; set; }
  350. public void Dispose()
  351. {
  352. Pool.Push(this);
  353. }
  354. }
  355. }
  356. }