Переглянути джерело

[mormot] improved async implementation; new test case with one listener (#8207)

* [mormot] improved async implementation; new test case with single listener

* [mormot] correctly configure TFB_TEST_NAME env var for use within the app

---------

Co-authored-by: pavel.mash <[email protected]>
pavelmash 2 роки тому
батько
коміт
0ec0efc821

+ 22 - 0
frameworks/Pascal/mormot/benchmark_config.json

@@ -69,6 +69,28 @@
         "display_name": "mormot [async]",
         "notes": "",
         "versus": "None"
+      },
+      "postgres-async2": {
+        "dockerfile": "mormot.dockerfile",
+        "db_url": "/asyncdb",
+        "query_url": "/asyncqueries?queries=",
+        "fortune_url": "/asyncfortunes",
+        "update_url": "/asyncupdates?queries=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Fullstack",
+        "database": "postgres",
+        "framework": "mormot",
+        "language": "Pascal",
+        "flavor": "None",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "None",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "mormot [async,nopin]",
+        "notes": "",
+        "versus": "None"
       }
     }
   ]

+ 5 - 0
frameworks/Pascal/mormot/mormot.dockerfile

@@ -10,9 +10,14 @@ COPY setup_and_build.sh .
 RUN /bin/bash -c ./setup_and_build.sh
 
 FROM ubuntu:22.04
+
+ARG TFB_TEST_NAME
+
 COPY --from=builder /build/bin/fpc-x86_64-linux/raw /usr/local/bin/raw
 COPY --from=builder /build/libpq.so.5.16 /usr/lib/x86_64-linux-gnu/libpq.so.5
 
+ENV TFB_TEST_NAME=$TFB_TEST_NAME
+
 EXPOSE 8080
 CMD ["raw"]
 

+ 1 - 1
frameworks/Pascal/mormot/setup_and_build.sh

@@ -35,7 +35,7 @@ echo "Download statics from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot/static
 
 # uncomment for fixed commit URL
-URL=https://github.com/synopse/mORMot2/tarball/bda095131d3f848b070981ab155adc9a73c0e75c
+URL=https://github.com/synopse/mORMot2/tarball/c66f01232083104d7b84757ae14b4af8351ae6ae
 #URL="https://api.github.com/repos/synopse/mORMot2/tarball/$USED_TAG"
 echo "Download and unpacking mORMot sources from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot  --strip-components=1

+ 61 - 59
frameworks/Pascal/mormot/src/raw.pas

@@ -82,13 +82,11 @@ type
   TRawAsyncServer = class(TSynPersistent)
   protected
     fHttpServer: THttpAsyncServer;
-    fDbPool: TSqlDBConnectionProperties;
     fModel: TOrmModel;
     fStore: TRestServerDB;
     fTemplate: TSynMustache;
     fCachedWorldsTable: POrmCacheTable;
-    fAsyncWorldRead, fAsyncFortunesRead: TSqlDBPostgresAsyncStatement;
-    fAsyncWorldUpdate: TSqlDBPostgresAsyncStatement;
+    fDbPool: TSqlDBPostgresConnectionProperties;
     procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
     procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
     // pipelined reading as used by /rawqueries and /rawupdates
@@ -96,7 +94,7 @@ type
     function ComputeRawFortunes(stmt: TSqlDBStatement; ctxt: THttpServerRequest): integer;
   public
     constructor Create(threadCount: integer; flags: THttpServerOptions;
-      pin2Core: integer = -1); reintroduce;
+      pin2Core: integer); reintroduce;
     destructor Destroy; override;
   published
     // all service URI are implemented by these published methods using RTTI
@@ -187,16 +185,6 @@ begin
   fStore := TRestServerDB.Create(fModel, SQLITE_MEMORY_DATABASE_NAME);
   fStore.NoAjaxJson := true;
   fStore.Server.CreateMissingTables; // create SQlite3 virtual tables
-  with (fDBPool as TSqlDBPostgresConnectionProperties).Async do
-  begin
-    fAsyncWorldRead := NewStatement(WORLD_READ_SQL,
-      [asoForceConnectionFlush, asoForcePipelineSync]);
-    fAsyncFortunesRead := NewStatement(FORTUNES_SQL,
-      [asoForceConnectionFlush, asoForcePipelineSync]);
-    fAsyncWorldUpdate := NewStatement(WORLD_UPDATE_SQLN,
-      [asoForceConnectionFlush, asoForcePipelineSync, asoExpectNoResult]);
-    // no SetThreadCpuAffinity(fAsyncWorldRead.Owner.Thread, pin2Core) needed
-  end;
   // pre-fill the ORM
   if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
     fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
@@ -543,14 +531,18 @@ end;
 
 // asynchronous PostgreSQL pipelined DB access
 
+const
+  // follow TFB requirements, and potential patched libpq
+  ASYNC_OPT = [asoForceConnectionFlush, asoForcePipelineSync];
+
 function TRawAsyncServer.asyncdb(ctxt: THttpServerRequest): cardinal;
 begin
-  fAsyncWorldRead.Lock;
+  with fDbPool.Async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
   try
-    fAsyncWorldRead.Bind(1, ComputeRandomWorld);
-    fAsyncWorldRead.ExecuteAsync(ctxt, OnAsyncDb);
+    Bind(1, ComputeRandomWorld);
+    ExecuteAsync(ctxt, OnAsyncDb);
   finally
-    fAsyncWorldRead.UnLock;
+    UnLock;
   end;
   result := ctxt.SetAsyncResponse;
 end;
@@ -571,7 +563,8 @@ end;
 
 function TRawAsyncServer.asyncfortunes(ctxt: THttpServerRequest): cardinal;
 begin
-  fAsyncFortunesRead.ExecuteAsyncNoParam(ctxt, OnAsyncFortunes);
+  fDbPool.Async.PrepareLocked(FORTUNES_SQL, {res=}true, ASYNC_OPT).
+    ExecuteAsyncNoParam(ctxt, OnAsyncFortunes);
   result := ctxt.SetAsyncResponse;
 end;
 
@@ -587,13 +580,12 @@ type
   // simple state machine used for /asyncqueries and /asyncupdates
   TAsyncWorld = class
   public
-    server: TRawAsyncServer;
     request: THttpServerRequest;
     res: TWorlds;
-    count: PtrInt;
-    fromupdates: boolean;
-    function Queries(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
-    function Updates(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
+    count, current: integer;
+    update: TSqlDBPostgresAsyncStatement; // prepared before any callback
+    function Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
+    function Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
     procedure DoUpdates;
     procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
     procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
@@ -601,47 +593,48 @@ type
 
 function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
 begin
-  result := TAsyncWorld.Create.Queries(self, ctxt);
+  result := TAsyncWorld.Create.Queries(fDBPool.Async, ctxt);
 end;
 
 function TRawAsyncServer.asyncupdates(ctxt: THttpServerRequest): cardinal;
 begin
-  result := TAsyncWorld.Create.Updates(self, ctxt);
+  result := TAsyncWorld.Create.Updates(fDBPool.Async, ctxt);
 end;
 
 
 { TAsyncWorld }
 
-function TAsyncWorld.Queries(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
+function TAsyncWorld.Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
 var
-  n: PtrInt;
+  n: integer;
   opt: TSqlDBPostgresAsyncStatementOptions; // for modified libpq
 begin
-  server := owner;
   request := ctxt;
-  n := getQueriesParamValue(ctxt);
-  SetLength(res, n); // n is > 0
-  server.fAsyncWorldRead.Lock;
+  if count = 0 then
+    count := getQueriesParamValue(ctxt);
+  SetLength(res, count); // count is > 0
+  with async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
   try
-    opt := server.fAsyncWorldRead.AsyncOptions - [asoForceConnectionFlush];
+    opt := AsyncOptions - [asoForceConnectionFlush];
+    n := count;
     repeat
       dec(n);
-      server.fAsyncWorldRead.Bind(1, ComputeRandomWorld);
-      if n = 0 then // last item
-        opt := server.fAsyncWorldRead.AsyncOptions;
-      server.fAsyncWorldRead.ExecuteAsync(ctxt, OnQueries, @opt);
+      Bind(1, ComputeRandomWorld);
+      if n = 0 then // last item should include asoForceConnectionFlush (if set)
+        opt := AsyncOptions;
+      ExecuteAsync(ctxt, OnQueries, @opt);
     until n = 0;
   finally
-    server.fAsyncWorldRead.UnLock;
+    UnLock;
   end;
   result := ctxt.SetAsyncResponse;
 end;
 
-function TAsyncWorld.Updates(owner: TRawAsyncServer;
-  ctxt: THttpServerRequest): cardinal;
+function TAsyncWorld.Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
 begin
-  fromupdates := true;
-  result := Queries(owner, ctxt);
+  count := getQueriesParamValue(ctxt);
+  update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT);
+  result := Queries(async, ctxt);
 end;
 
 procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
@@ -649,14 +642,14 @@ procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
 begin
   if (Statement <> nil) and
      Statement.Step then
-    with res[count] do
+    with res[current] do
     begin
       id := Statement.ColumnInt(0);
       randomNumber := Statement.ColumnInt(1);
     end;
-  inc(count);
-  if count = length(res) then // we retrieved all SELECT
-    if fromupdates then
+  inc(current);
+  if current = count then // we retrieved all SELECT
+    if Assigned(update) then
       DoUpdates
     else
       OnRes(Statement, Context);
@@ -670,16 +663,15 @@ begin
   setLength(ids{%H-}, count);
   setLength(nums{%H-}, count);
   for i := 0 to count - 1 do
-  with res[i] do
-  begin
-    randomNumber := ComputeRandomWorld;
-    ids[i] := id;
-    nums[i] := randomNumber;
-  end;
-  // note: no need of server.fAsyncWorldUpdate.Lock/UnLock inside the callbacks
-  server.fAsyncWorldUpdate.BindArray(1, ids);
-  server.fAsyncWorldUpdate.BindArray(2, nums);
-  server.fAsyncWorldUpdate.ExecuteAsync(request, OnRes);
+    with res[i] do
+    begin
+      randomNumber := ComputeRandomWorld;
+      ids[i] := id;
+      nums[i] := randomNumber;
+    end;
+  update.BindArray(1, ids);
+  update.BindArray(2, nums);
+  update.ExecuteAsync(request, OnRes);
 end;
 
 procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
@@ -728,6 +720,14 @@ begin
       // asynchronus test
       servers := cpuCount * 2;
       threads := 1;
+    end
+    else
+    if GetEnvironmentVariable('TFB_TEST_NAME') = 'mormot-postgres-async2' then
+    begin
+      // asynchronus test with single listener socket and no CPU pinning
+      servers := 1;
+      threads := cpuCount * 2;
+      pinServers2Cores := false;
     end;
   end
   else
@@ -762,10 +762,11 @@ begin
   if servers > 1 then
     include(flags, hsoReusePort) // allow several bindings on the same port
   else
-    pinServers2Cores := false;   // don't make any sense
+    pinServers2Cores := false;   // pinning a single server won't make any sense
   SetLength(rawServers{%H-}, servers);
   cpuIdx := -1; // do not pin to CPU by default
-  for i := 0 to servers - 1 do begin
+  for i := 0 to servers - 1 do
+  begin
     if pinServers2Cores then
     begin
       k := i mod cpuCount;
@@ -805,7 +806,8 @@ begin
       writeln('Per-server accepted connections:');
       for i := 0 to servers - 1 do
         write(' ', rawServers[i].fHttpServer.Async.Accepted);
-      writeln(#10'Please wait: Shutdown ', servers, ' servers');
+      writeln(#10'Please wait: Shutdown ', servers, ' servers and ',
+        threads * servers, ' threads');
     end;
   finally
     // clear all server instance(s)