Przeglądaj źródła

[mORMot] - upgrade to 2.2.7414 (#8949)

- added /cached_query implementation for raw server
 - increased threads count for async server cpuCount * 2 -> cpuCount * 4

Co-authored-by: pavel.mash <[email protected]>
pavelmash 1 rok temu
rodzic
commit
c954e78443

+ 1 - 0
frameworks/Pascal/mormot/benchmark_config.json

@@ -31,6 +31,7 @@
         "dockerfile": "mormot.dockerfile",
         "db_url": "/rawdb",
         "query_url": "/rawqueries?queries=",
+        "cached_query_url": "/rawcached?count=",
         "fortune_url": "/rawfortunes",
         "update_url": "/rawupdates?queries=",
         "port": 8080,

+ 1 - 1
frameworks/Pascal/mormot/setup_and_build.sh

@@ -35,7 +35,7 @@ echo "Download statics from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot/static
 
 # uncomment for fixed commit URL
-URL=https://github.com/synopse/mORMot2/tarball/7dc50900266f07454fe60b60e4a2755ce445ddeb
+URL=https://github.com/synopse/mORMot2/tarball/527b3fb11cb4dad5f2c03ace293b550f85504420
 #URL="https://api.github.com/repos/synopse/mORMot2/tarball/$USED_TAG"
 echo "Download and unpacking mORMot sources from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot  --strip-components=1

+ 78 - 51
frameworks/Pascal/mormot/src/raw.pas

@@ -54,7 +54,7 @@ type
   end;
   TWorlds = array of TWorldRec;
   TFortune = packed record
-    id: integer;
+    id: PtrUInt;
     message: PUtf8Char;
   end;
   TFortunes = array of TFortune;
@@ -86,6 +86,7 @@ type
     fStore: TRestServerDB;
     fTemplate: TSynMustache;
     fCachedWorldsTable: POrmCacheTable;
+    fRawCache: TOrmWorlds;
     fDbPool: TSqlDBPostgresConnectionProperties;
     procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
     procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
@@ -107,6 +108,7 @@ type
     function updates(ctxt: THttpServerRequest): cardinal;
     function rawdb(ctxt: THttpServerRequest): cardinal;
     function rawqueries(ctxt: THttpServerRequest): cardinal;
+    function rawcached(ctxt: THttpServerRequest): cardinal;
     function rawfortunes(ctxt: THttpServerRequest): cardinal;
     function rawupdates(ctxt: THttpServerRequest): cardinal;
     // asynchronous PostgreSQL pipelined DB access
@@ -144,13 +146,13 @@ const
                      '</html>';
 
 
-function ComputeRandomWorld: integer; inline;
+function ComputeRandomWorld(gen: PLecuyer): integer; inline;
 begin
-  result := Random32(WORLD_COUNT) + 1;
+  result := gen^.Next(WORLD_COUNT) + 1;
 end;
 
 function GetQueriesParamValue(ctxt: THttpServerRequest;
-  const search: RawUtf8 = 'QUERIES='): cardinal;
+  const search: RawUtf8 = 'QUERIES='): cardinal; inline;
 begin
   if not ctxt.UrlParam(search, result) or
      (result = 0) then
@@ -190,6 +192,7 @@ begin
   if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
     fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
   fCachedWorldsTable := fStore.Orm.Cache.Table(TOrmCachedWorld);
+  fStore.Orm.RetrieveListObjArray(fRawCache, TOrmCachedWorld, 'order by id', []);
   // initialize the mustache template for /fortunes
   fTemplate := TSynMustache.Parse(FORTUNES_TPL);
   // setup the HTTP server
@@ -222,22 +225,26 @@ begin
   fHttpServer.Free;
   fStore.Free;
   fModel.Free;
-  fDBPool.free;
+  fDBPool.Free;
+  ObjArrayClear(fRawCache);
   inherited Destroy;
 end;
 
 // query DB world table for /rawqueries and /rawupdates endpoints
 
-function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
+function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt;
+  out res: TWorlds): boolean;
 var
   conn: TSqlDBConnection;
   stmt: ISqlDBStatement;
   pConn: TSqlDBPostgresConnection absolute conn;
   pStmt: TSqlDBPostgresStatement;
+  gen: PLecuyer;
   i: PtrInt;
 begin
   result := false;
   SetLength(res{%H-}, cnt);
+  gen := Lecuyer;
   conn := fDbPool.ThreadSafeConnection;
   // specific code to use PostgresSQL pipelining mode
   // see test_nosync in
@@ -247,7 +254,7 @@ begin
   pStmt := TSqlDBPostgresStatement(stmt.Instance);
   for i := 0 to cnt - 1 do
   begin
-    pStmt.Bind(1, ComputeRandomWorld);
+    pStmt.Bind(1, ComputeRandomWorld(gen));
     pStmt.SendPipelinePrepared;
     pConn.PipelineSync;
   end;
@@ -323,7 +330,7 @@ function TRawAsyncServer.db(ctxt: THttpServerRequest): cardinal;
 var
   w: TOrmWorld;
 begin
-  w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
+  w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(Lecuyer));
   try
     ctxt.SetOutJson(w);
     result := HTTP_SUCCESS;
@@ -336,10 +343,12 @@ function TRawAsyncServer.queries(ctxt: THttpServerRequest): cardinal;
 var
   i: PtrInt;
   res: TOrmWorlds;
+  gen: PLecuyer;
 begin
   SetLength(res, GetQueriesParamValue(ctxt, 'QUERIES='));
+  gen := Lecuyer;
   for i := 0 to length(res) - 1 do
-    res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
+    res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(gen));
   ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
   ObjArrayClear(res);
   result := HTTP_SUCCESS;
@@ -349,10 +358,12 @@ function TRawAsyncServer.cached_queries(ctxt: THttpServerRequest): cardinal;
 var
   i: PtrInt;
   res: TOrmWorlds;
+  gen: PLecuyer;
 begin
   SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
+  gen := Lecuyer;
   for i := 0 to length(res) - 1 do
-    res[i] := fCachedWorldsTable.Get(ComputeRandomWorld);
+    res[i] := fCachedWorldsTable.Get(ComputeRandomWorld(gen));
   ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
   result := HTTP_SUCCESS;
 end;
@@ -390,19 +401,21 @@ var
   res: TOrmWorlds;
   w: TOrmWorld;
   b: TRestBatch;
+  gen: PLecuyer;
 begin
   result := HTTP_SERVERERROR;
   SetLength(res, GetQueriesParamValue(ctxt));
   b := TRestBatch.Create(fStore.ORM, TOrmWorld, {transrows=}0,
     [boExtendedJson, boNoModelEncoding, boPutNoCacheFlush]);
   try
+    gen := Lecuyer;
     for i := 0 to length(res) - 1 do
     begin
       w := TOrmWorld.Create;
       res[i] := w;
-      if not fStore.Orm.Retrieve(ComputeRandomWorld, w) then
+      if not fStore.Orm.Retrieve(ComputeRandomWorld(gen), w) then
         exit;
-      w.RandomNumber := ComputeRandomWorld;
+      w.RandomNumber := ComputeRandomWorld(gen);
       b.Update(w);
     end;
     result := b.Send;
@@ -422,7 +435,7 @@ begin
   result := HTTP_SERVERERROR;
   conn := fDbPool.ThreadSafeConnection;
   stmt := conn.NewStatementPrepared(WORLD_READ_SQL, true, true);
-  stmt.Bind(1, ComputeRandomWorld);
+  stmt.Bind(1, ComputeRandomWorld(Lecuyer));
   stmt.ExecutePrepared;
   if stmt.Step then
   begin
@@ -444,6 +457,20 @@ begin
   result := HTTP_SUCCESS;
 end;
 
+function TRawAsyncServer.rawcached(ctxt: THttpServerRequest): cardinal;
+var
+  i: PtrInt;
+  res: TOrmWorlds;
+  gen: PLecuyer;
+begin
+  SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
+  gen := Lecuyer;
+  for i := 0 to length(res) - 1 do
+    res[i] := fRawCache[ComputeRandomWorld(gen) - 1];
+  ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
+  result := HTTP_SUCCESS;
+end;
+
 function TRawAsyncServer.rawfortunes(ctxt: THttpServerRequest): cardinal;
 var
   conn: TSqlDBConnection;
@@ -476,10 +503,7 @@ begin
     try
       W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
       for i := 1 to cnt do
-      begin
-        W.AddShort('(?::integer, ?::integer)');
-        W.Add(',');
-      end;
+        W.AddShort('(?::integer, ?::integer),');
       W.CancelLastComma;
       W.AddShort(' order by 1) AS v (id, randomNumber) WHERE world.id = v.id');
       W.SetText(LastComputeUpdateSql);
@@ -496,6 +520,7 @@ var
   cnt, i: PtrInt;
   res: TWorlds;
   ids, nums: TInt64DynArray;
+  gen: PLecuyer;
   conn: TSqlDBConnection;
   stmt: ISqlDBStatement;
 begin
@@ -505,8 +530,9 @@ begin
   if not getRawRandomWorlds(cnt, res) then
     exit;
   // generate new randoms
+  gen := Lecuyer;
   for i := 0 to cnt - 1 do
-    res[i].randomNumber := ComputeRandomWorld;
+    res[i].randomNumber := ComputeRandomWorld(gen);
   if cnt > 20 then
   begin
     // fill parameters arrays for update with nested select (PostgreSQL only)
@@ -546,7 +572,7 @@ function TRawAsyncServer.asyncdb(ctxt: THttpServerRequest): cardinal;
 begin
   with fDbPool.Async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
   try
-    Bind(1, ComputeRandomWorld);
+    Bind(1, ComputeRandomWorld(Lecuyer));
     ExecuteAsync(ctxt, OnAsyncDb);
   finally
     UnLock;
@@ -591,8 +617,9 @@ type
     res: TWorlds;
     count, current: integer;
     update: TSqlDBPostgresAsyncStatement; // prepared before any callback
-    function Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
-    function Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
+    async: TSqlDBPostgresAsync;
+    function Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
+    function Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
     procedure DoUpdates;
     procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
     procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
@@ -600,48 +627,51 @@ type
 
 function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
 begin
-  result := TAsyncWorld.Create.Queries(fDBPool.Async, ctxt);
+  result := TAsyncWorld.Create.Queries(self, ctxt);
 end;
 
 function TRawAsyncServer.asyncupdates(ctxt: THttpServerRequest): cardinal;
 begin
-  result := TAsyncWorld.Create.Updates(fDBPool.Async, ctxt);
+  result := TAsyncWorld.Create.Updates(self, ctxt);
 end;
 
 
 { TAsyncWorld }
 
-function TAsyncWorld.Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
+function TAsyncWorld.Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
 var
   n: integer;
-  opt: TSqlDBPostgresAsyncStatementOptions; // for modified libpq
+  opt: TSqlDBPostgresAsyncStatementOptions; // forced options for modified libpq
+  gen: PLecuyer;
+  select: TSqlDBPostgresAsyncStatement;
 begin
   request := ctxt;
+  if async = nil then
+    async := server.fDbPool.Async;
   if count = 0 then
     count := getQueriesParamValue(ctxt);
   SetLength(res, count); // count is > 0
-  with async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
-  try
-    opt := AsyncOptions - [asoForceConnectionFlush];
-    n := count;
-    repeat
-      dec(n);
-      Bind(1, ComputeRandomWorld);
-      if n = 0 then // last item should include asoForceConnectionFlush (if set)
-        opt := AsyncOptions;
-      ExecuteAsync(ctxt, OnQueries, @opt);
-    until n = 0;
-  finally
-    UnLock;
-  end;
+  select := async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT);
+  opt := ASYNC_OPT - [asoForceConnectionFlush];
+  n := count;
+  gen := Lecuyer;
+  repeat
+    dec(n);
+    select.Bind(1, ComputeRandomWorld(gen));
+    if n = 0 then // last item should include asoForceConnectionFlush (if set)
+      opt := ASYNC_OPT;
+    select.ExecuteAsync(ctxt, OnQueries, @opt);
+  until n = 0;
+  select.UnLock;
   result := ctxt.SetAsyncResponse;
 end;
 
-function TAsyncWorld.Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
+function TAsyncWorld.Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
 begin
+  async := server.fDbPool.Async;
   count := getQueriesParamValue(ctxt);
   update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT);
-  result := Queries(async, ctxt);
+  result := Queries(server, ctxt);
 end;
 
 procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
@@ -666,9 +696,11 @@ procedure TAsyncWorld.DoUpdates;
 var
   i: PtrInt;
   params: TIntegerDynArray;
+  gen: PLecuyer;
 begin
+  gen := Lecuyer;
   for i := 0 to count - 1 do
-    res[i].randomNumber := ComputeRandomWorld;
+    res[i].randomNumber := ComputeRandomWorld(gen);
   SetLength(params, count);
   for i := 0 to count - 1 do
     params[i] := res[i].id;
@@ -708,8 +740,8 @@ begin
   // register some RTTI for records JSON serialization
   Rtti.RegisterFromText([
     TypeInfo(TMessageRec), 'message:PUtf8Char',
-    TypeInfo(TWorldRec),   'id,randomNumber:integer',
-    TypeInfo(TFortune),    'id:integer message:PUtf8Char']);
+    TypeInfo(TWorldRec),   'id,randomNumber:cardinal',
+    TypeInfo(TFortune),    'id:PtrUInt message:PUtf8Char']);
 
   // compute default execution context from HW information
   cpuCount := CurrentCpuSet(cpuMask); // may run from a "taskset" command
@@ -731,7 +763,7 @@ begin
     begin
       // asynchronus test with single listener socket and no CPU pinning
       servers := 1;
-      threads := cpuCount * 2;
+      threads := cpuCount * 4;
       pinServers2Cores := false;
     end;
   end
@@ -753,12 +785,7 @@ begin
       pinServers2Cores := false; // no option would keep the default boolean
     Get(['s', 'servers'], servers, '#count of servers (listener sockets)', servers);
     Get(['t', 'threads'], threads, 'per-server thread pool #size', threads);
-    if Option(['?', 'help'], 'display this message') then
-    begin
-      ConsoleWrite(FullDescription);
-      exit;
-    end;
-    if ConsoleWriteUnknown then
+    if ConsoleHelpFailed('TFB Server using mORMot 2') then
       exit;
   end;