|
@@ -6,7 +6,7 @@ program raw;
|
|
|
in modern pascal and the mORMot 2 framework
|
|
|
----------------------------------------------------
|
|
|
https://github.com/TechEmpower/FrameworkBenchmarks/wiki
|
|
|
- command line optional syntax: raw [threads] [cores] [servers]
|
|
|
+ command line optional syntax: run "raw -?"
|
|
|
}
|
|
|
|
|
|
{$I mormot.defines.inc}
|
|
@@ -80,15 +80,20 @@ type
|
|
|
|
|
|
// main server class
|
|
|
TRawAsyncServer = class(TSynPersistent)
|
|
|
- private
|
|
|
+ protected
|
|
|
fHttpServer: THttpAsyncServer;
|
|
|
fDbPool: TSqlDBConnectionProperties;
|
|
|
fModel: TOrmModel;
|
|
|
fStore: TRestServerDB;
|
|
|
fTemplate: TSynMustache;
|
|
|
- protected
|
|
|
- // as used by /rawqueries and /rawupdates
|
|
|
+ fCachedWorldsTable: POrmCacheTable;
|
|
|
+ fAsyncWorldRead, fAsyncFortunesRead: TSqlDBPostgresAsyncStatement;
|
|
|
+ fAsyncWorldUpdate: TSqlDBPostgresAsyncStatement;
|
|
|
+ procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
+ procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
+ // pipelined reading as used by /rawqueries and /rawupdates
|
|
|
function GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
|
|
|
+ function ComputeRawFortunes(stmt: TSqlDBStatement; ctxt: THttpServerRequest): integer;
|
|
|
public
|
|
|
constructor Create(threadCount: integer; flags: THttpServerOptions;
|
|
|
pin2Core: integer = -1); reintroduce;
|
|
@@ -106,6 +111,11 @@ type
|
|
|
function rawqueries(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
+ // asynchronous PostgreSQL pipelined DB access
|
|
|
+ function asyncdb(ctxt: THttpServerRequest): cardinal;
|
|
|
+ function asyncqueries(ctxt: THttpServerRequest): cardinal;
|
|
|
+ function asyncfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
+ function asyncupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
end;
|
|
|
|
|
|
{$I-}
|
|
@@ -116,7 +126,7 @@ const
|
|
|
|
|
|
WORLD_COUNT = 10000;
|
|
|
WORLD_READ_SQL = 'select id,randomNumber from World where id=?';
|
|
|
- WORLD_UPDATE_SQLN ='update World as t set randomNumber = v.r from ' +
|
|
|
+ WORLD_UPDATE_SQLN = 'update World as t set randomNumber = v.r from ' +
|
|
|
'(SELECT unnest(?::bigint[]), unnest(?::bigint[]) order by 1) as v(id, r)' +
|
|
|
' where t.id = v.id';
|
|
|
FORTUNES_SQL = 'select id,message from Fortune';
|
|
@@ -177,8 +187,20 @@ begin
|
|
|
fStore := TRestServerDB.Create(fModel, SQLITE_MEMORY_DATABASE_NAME);
|
|
|
fStore.NoAjaxJson := true;
|
|
|
fStore.Server.CreateMissingTables; // create SQlite3 virtual tables
|
|
|
+ with (fDBPool as TSqlDBPostgresConnectionProperties).Async do
|
|
|
+ begin
|
|
|
+ fAsyncWorldRead := NewStatement(WORLD_READ_SQL,
|
|
|
+ [asoForceConnectionFlush, asoForcePipelineSync]);
|
|
|
+ fAsyncFortunesRead := NewStatement(FORTUNES_SQL,
|
|
|
+ [asoForceConnectionFlush, asoForcePipelineSync]);
|
|
|
+ fAsyncWorldUpdate := NewStatement(WORLD_UPDATE_SQLN,
|
|
|
+ [asoForceConnectionFlush, asoForcePipelineSync, asoExpectNoResult]);
|
|
|
+ // no SetThreadCpuAffinity(fAsyncWorldRead.Owner.Thread, pin2Core) needed
|
|
|
+ end;
|
|
|
+ // pre-fill the ORM
|
|
|
if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
|
|
|
fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
|
|
|
+ fCachedWorldsTable := fStore.Orm.Cache.Table(TOrmCachedWorld);
|
|
|
// initialize the mustache template for /fortunes
|
|
|
fTemplate := TSynMustache.Parse(FORTUNES_TPL);
|
|
|
// setup the HTTP server
|
|
@@ -219,7 +241,7 @@ end;
|
|
|
function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
|
|
|
var
|
|
|
conn: TSqlDBConnection;
|
|
|
- stmt: ISQLDBStatement;
|
|
|
+ stmt: ISqlDBStatement;
|
|
|
pConn: TSqlDBPostgresConnection absolute conn;
|
|
|
pStmt: TSqlDBPostgresStatement;
|
|
|
i: PtrInt;
|
|
@@ -239,7 +261,6 @@ begin
|
|
|
pStmt.SendPipelinePrepared;
|
|
|
pConn.PipelineSync;
|
|
|
end;
|
|
|
- pConn.Flush; // in case we use modified libpq what not flush inside PQPipelineSync - flush manually
|
|
|
for i := 0 to cnt - 1 do
|
|
|
begin
|
|
|
pStmt.GetPipelineResult;
|
|
@@ -254,6 +275,38 @@ begin
|
|
|
result := true;
|
|
|
end;
|
|
|
|
|
|
+function FortuneCompareByMessage(const A, B): integer;
|
|
|
+begin
|
|
|
+ result := StrComp(pointer(TFortune(A).message), pointer(TFortune(B).message));
|
|
|
+end;
|
|
|
+
|
|
|
+function TRawAsyncServer.ComputeRawFortunes(
|
|
|
+ stmt: TSqlDBStatement; ctxt: THttpServerRequest): integer;
|
|
|
+var
|
|
|
+ list: TFortunes;
|
|
|
+ arr: TDynArray;
|
|
|
+ n: integer;
|
|
|
+ f: ^TFortune;
|
|
|
+begin
|
|
|
+ result := HTTP_BADREQUEST;
|
|
|
+ if stmt = nil then
|
|
|
+ exit;
|
|
|
+ arr.Init(TypeInfo(TFortunes), list, @n);
|
|
|
+ while stmt.Step do
|
|
|
+ begin
|
|
|
+ f := arr.NewPtr;
|
|
|
+ f.id := stmt.ColumnInt(0);
|
|
|
+ f.message := stmt.ColumnUtf8(1);
|
|
|
+ end;
|
|
|
+ f := arr.NewPtr;
|
|
|
+ f.id := 0;
|
|
|
+ f.message := FORTUNES_MESSAGE;
|
|
|
+ arr.Sort(FortuneCompareByMessage);
|
|
|
+ ctxt.OutContent := fTemplate.RenderDataArray(arr);
|
|
|
+ ctxt.OutContentType := HTML_CONTENT_TYPE;
|
|
|
+ result := HTTP_SUCCESS;
|
|
|
+end;
|
|
|
+
|
|
|
// following methods implement the server endpoints
|
|
|
|
|
|
function TRawAsyncServer.plaintext(ctxt: THttpServerRequest): cardinal;
|
|
@@ -302,12 +355,10 @@ function TRawAsyncServer.cached_queries(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
i: PtrInt;
|
|
|
res: TOrmWorlds;
|
|
|
- cache: POrmCacheTable;
|
|
|
begin
|
|
|
- cache := fStore.Orm.Cache.Table(TOrmCachedWorld);
|
|
|
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
|
|
|
for i := 0 to length(res) - 1 do
|
|
|
- res[i] := cache.Get(ComputeRandomWorld);
|
|
|
+ res[i] := fCachedWorldsTable.Get(ComputeRandomWorld);
|
|
|
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
@@ -372,7 +423,7 @@ end;
|
|
|
function TRawAsyncServer.rawdb(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
conn: TSqlDBConnection;
|
|
|
- stmt: ISQLDBStatement;
|
|
|
+ stmt: ISqlDBStatement;
|
|
|
begin
|
|
|
result := HTTP_SERVERERROR;
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
@@ -399,37 +450,15 @@ begin
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
|
|
|
|
-function FortuneCompareByMessage(const A, B): integer;
|
|
|
-begin
|
|
|
- result := StrComp(pointer(TFortune(A).message), pointer(TFortune(B).message));
|
|
|
-end;
|
|
|
-
|
|
|
function TRawAsyncServer.rawfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
conn: TSqlDBConnection;
|
|
|
- stmt: ISQLDBStatement;
|
|
|
- list: TFortunes;
|
|
|
- arr: TDynArray;
|
|
|
- n: integer;
|
|
|
- f: ^TFortune;
|
|
|
+ stmt: ISqlDBStatement;
|
|
|
begin
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
|
stmt := conn.NewStatementPrepared(FORTUNES_SQL, true, true);
|
|
|
stmt.ExecutePrepared;
|
|
|
- arr.Init(TypeInfo(TFortunes), list, @n);
|
|
|
- while stmt.Step do
|
|
|
- begin
|
|
|
- f := arr.NewPtr;
|
|
|
- f.id := stmt.ColumnInt(0);
|
|
|
- f.message := stmt.ColumnUtf8(1);
|
|
|
- end;
|
|
|
- f := arr.NewPtr;
|
|
|
- f.id := 0;
|
|
|
- f.message := FORTUNES_MESSAGE;
|
|
|
- arr.Sort(FortuneCompareByMessage);
|
|
|
- ctxt.OutContent := fTemplate.RenderDataArray(arr);
|
|
|
- ctxt.OutContentType := HTML_CONTENT_TYPE;
|
|
|
- result := HTTP_SUCCESS;
|
|
|
+ result := ComputeRawFortunes(stmt.Instance, ctxt);
|
|
|
end;
|
|
|
|
|
|
var
|
|
@@ -473,7 +502,7 @@ var
|
|
|
res: TWorlds;
|
|
|
ids, nums: TInt64DynArray;
|
|
|
conn: TSqlDBConnection;
|
|
|
- stmt: ISQLDBStatement;
|
|
|
+ stmt: ISqlDBStatement;
|
|
|
begin
|
|
|
result := HTTP_SERVERERROR;
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
@@ -512,94 +541,163 @@ begin
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
|
|
|
|
+// asynchronous PostgreSQL pipelined DB access
|
|
|
|
|
|
+function TRawAsyncServer.asyncdb(ctxt: THttpServerRequest): cardinal;
|
|
|
+begin
|
|
|
+ fAsyncWorldRead.Lock;
|
|
|
+ try
|
|
|
+ fAsyncWorldRead.Bind(1, ComputeRandomWorld);
|
|
|
+ fAsyncWorldRead.ExecuteAsync(ctxt, OnAsyncDb);
|
|
|
+ finally
|
|
|
+ fAsyncWorldRead.UnLock;
|
|
|
+ end;
|
|
|
+ result := ctxt.SetAsyncResponse;
|
|
|
+end;
|
|
|
|
|
|
+procedure TRawAsyncServer.OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
+ Context: TObject);
|
|
|
var
|
|
|
- rawServers: array of TRawAsyncServer;
|
|
|
- threads, servers, i, k, cpuIdx: integer;
|
|
|
- pinServers2Cores: boolean;
|
|
|
- cpuMask: TCpuSet;
|
|
|
- accessibleCPUCount: PtrInt;
|
|
|
- flags: THttpServerOptions;
|
|
|
+ ctxt: THttpServerRequest absolute Context;
|
|
|
+begin
|
|
|
+ if (Statement = nil) or
|
|
|
+ not Statement.Step then
|
|
|
+ ctxt.ErrorMessage := 'asyncdb failed'
|
|
|
+ else
|
|
|
+ ctxt.SetOutJson('{"id":%,"randomNumber":%}',
|
|
|
+ [Statement.ColumnInt(0), Statement.ColumnInt(1)]);
|
|
|
+ ctxt.OnAsyncResponse(ctxt);
|
|
|
+end;
|
|
|
|
|
|
-function FindCmdLineSwitchVal(const Switch: string; out Value: string): Boolean;
|
|
|
+function TRawAsyncServer.asyncfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
+begin
|
|
|
+ fAsyncFortunesRead.ExecuteAsyncNoParam(ctxt, OnAsyncFortunes);
|
|
|
+ result := ctxt.SetAsyncResponse;
|
|
|
+end;
|
|
|
+
|
|
|
+procedure TRawAsyncServer.OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
+ Context: TObject);
|
|
|
var
|
|
|
- I, L: integer;
|
|
|
- S, T: string;
|
|
|
-begin
|
|
|
- Result := False;
|
|
|
- S := Switch;
|
|
|
- Value := '';
|
|
|
- S := UpperCase(S);
|
|
|
- I := ParamCount;
|
|
|
- while (Not Result) and (I>0) do
|
|
|
- begin
|
|
|
- L := Length(Paramstr(I));
|
|
|
- if (L>0) and (ParamStr(I)[1] in SwitchChars) then
|
|
|
- begin
|
|
|
- T := Copy(ParamStr(I),2,L-1);
|
|
|
- T := UpperCase(T);
|
|
|
- Result := S=T;
|
|
|
- if Result and (I <> ParamCount) then
|
|
|
- Value := ParamStr(I+1)
|
|
|
- end;
|
|
|
- Dec(i);
|
|
|
+ ctxt: THttpServerRequest absolute Context;
|
|
|
+begin
|
|
|
+ ctxt.OnAsyncResponse(ctxt, ComputeRawFortunes(Statement, ctxt));
|
|
|
+end;
|
|
|
+
|
|
|
+type
|
|
|
+ // simple state machine used for /asyncqueries and /asyncupdates
|
|
|
+ TAsyncWorld = class
|
|
|
+ public
|
|
|
+ server: TRawAsyncServer;
|
|
|
+ request: THttpServerRequest;
|
|
|
+ res: TWorlds;
|
|
|
+ count: PtrInt;
|
|
|
+ fromupdates: boolean;
|
|
|
+ function Queries(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
+ function Updates(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
+ procedure DoUpdates;
|
|
|
+ procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
+ procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
end;
|
|
|
+
|
|
|
+function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
|
|
|
+begin
|
|
|
+ result := TAsyncWorld.Create.Queries(self, ctxt);
|
|
|
end;
|
|
|
|
|
|
-procedure ComputeExecutionContextFromParams(cpusAccessible: PtrInt);
|
|
|
-var
|
|
|
- sw: string;
|
|
|
+function TRawAsyncServer.asyncupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
- // user specified some values at command line: raw [-s serversCount] [-t threadsPerServer] [-p]
|
|
|
- if not FindCmdLineSwitchVal('t', sw) or not TryStrToInt(sw, threads) then
|
|
|
- threads := cpusAccessible * 4;
|
|
|
- if not FindCmdLineSwitchVal('s', sw) or not TryStrToInt(sw, servers) then
|
|
|
- servers := 1;
|
|
|
- pinServers2Cores := FindCmdLineSwitch('p', true) or FindCmdLineSwitch('-pin', true);
|
|
|
- if threads < 1 then
|
|
|
- threads := 1
|
|
|
- else if threads > 256 then
|
|
|
- threads := 256; // max. threads for THttpAsyncServer
|
|
|
-
|
|
|
- if servers < 1 then
|
|
|
- servers := 1
|
|
|
- else if servers > 256 then
|
|
|
- servers := 256;
|
|
|
+ result := TAsyncWorld.Create.Updates(self, ctxt);
|
|
|
end;
|
|
|
|
|
|
-procedure ComputeExecutionContextFromNumberOfProcessors(cpusAccessible: PtrInt);
|
|
|
+
|
|
|
+{ TAsyncWorld }
|
|
|
+
|
|
|
+function TAsyncWorld.Queries(owner: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
+var
|
|
|
+ n: PtrInt;
|
|
|
+ opt: TSqlDBPostgresAsyncStatementOptions; // for modified libpq
|
|
|
begin
|
|
|
- // automatically guess best parameters depending on available CPU cores
|
|
|
- if cpusAccessible >= 6 then
|
|
|
- begin
|
|
|
- // scale using several listeners (one per core)
|
|
|
- // see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
|
|
|
- servers := cpusAccessible;
|
|
|
- threads := 8;
|
|
|
- pinServers2Cores := true;
|
|
|
- end
|
|
|
- else
|
|
|
- begin
|
|
|
- // low-level CPU - a single instance and a few threads per core
|
|
|
- servers := 1;
|
|
|
- threads := cpusAccessible * 4;
|
|
|
- pinServers2Cores := false;
|
|
|
+ server := owner;
|
|
|
+ request := ctxt;
|
|
|
+ n := getQueriesParamValue(ctxt);
|
|
|
+ SetLength(res, n); // n is > 0
|
|
|
+ server.fAsyncWorldRead.Lock;
|
|
|
+ try
|
|
|
+ opt := server.fAsyncWorldRead.AsyncOptions - [asoForceConnectionFlush];
|
|
|
+ repeat
|
|
|
+ dec(n);
|
|
|
+ server.fAsyncWorldRead.Bind(1, ComputeRandomWorld);
|
|
|
+ if n = 0 then // last item
|
|
|
+ opt := server.fAsyncWorldRead.AsyncOptions;
|
|
|
+ server.fAsyncWorldRead.ExecuteAsync(ctxt, OnQueries, @opt);
|
|
|
+ until n = 0;
|
|
|
+ finally
|
|
|
+ server.fAsyncWorldRead.UnLock;
|
|
|
end;
|
|
|
+ result := ctxt.SetAsyncResponse;
|
|
|
+end;
|
|
|
+
|
|
|
+function TAsyncWorld.Updates(owner: TRawAsyncServer;
|
|
|
+ ctxt: THttpServerRequest): cardinal;
|
|
|
+begin
|
|
|
+ fromupdates := true;
|
|
|
+ result := Queries(owner, ctxt);
|
|
|
end;
|
|
|
|
|
|
+procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
+ Context: TObject);
|
|
|
+begin
|
|
|
+ if (Statement <> nil) and
|
|
|
+ Statement.Step then
|
|
|
+ with res[count] do
|
|
|
+ begin
|
|
|
+ id := Statement.ColumnInt(0);
|
|
|
+ randomNumber := Statement.ColumnInt(1);
|
|
|
+ end;
|
|
|
+ inc(count);
|
|
|
+ if count = length(res) then // we retrieved all SELECT
|
|
|
+ if fromupdates then
|
|
|
+ DoUpdates
|
|
|
+ else
|
|
|
+ OnRes(Statement, Context);
|
|
|
+end;
|
|
|
+
|
|
|
+procedure TAsyncWorld.DoUpdates;
|
|
|
+var
|
|
|
+ i: PtrInt;
|
|
|
+ ids, nums: TInt64DynArray;
|
|
|
begin
|
|
|
- if FindCmdLineSwitch('?') or FindCmdLineSwitch('h') or FindCmdLineSwitch('-help', ['-'], false) then
|
|
|
+ setLength(ids{%H-}, count);
|
|
|
+ setLength(nums{%H-}, count);
|
|
|
+ for i := 0 to count - 1 do
|
|
|
+ with res[i] do
|
|
|
begin
|
|
|
- writeln('Usage: ' + UTF8ToString(ExeVersion.ProgramName) + ' [-s serversCount] [-t threadsPerServer] [-p]');
|
|
|
- writeln('Options:');
|
|
|
- writeln(' -?, --help displays this message');
|
|
|
- writeln(' -s serversCount count of servers (listener sockets)');
|
|
|
- writeln(' -t threadsPerServer per-server thread poll size');
|
|
|
- writeln(' -p, --pin pin each server to CPU starting from 0');
|
|
|
- exit;
|
|
|
+ randomNumber := ComputeRandomWorld;
|
|
|
+ ids[i] := id;
|
|
|
+ nums[i] := randomNumber;
|
|
|
end;
|
|
|
+ // note: no need of server.fAsyncWorldUpdate.Lock/UnLock inside the callbacks
|
|
|
+ server.fAsyncWorldUpdate.BindArray(1, ids);
|
|
|
+ server.fAsyncWorldUpdate.BindArray(2, nums);
|
|
|
+ server.fAsyncWorldUpdate.ExecuteAsync(request, OnRes);
|
|
|
+end;
|
|
|
+
|
|
|
+procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
+ Context: TObject);
|
|
|
+begin
|
|
|
+ request.SetOutJson(@res, TypeInfo(TWorlds));
|
|
|
+ request.OnAsyncResponse(Context as THttpServerRequest);
|
|
|
+ Free; // we don't need this state machine any more
|
|
|
+end;
|
|
|
|
|
|
+
|
|
|
+var
|
|
|
+ rawServers: array of TRawAsyncServer;
|
|
|
+ threads, servers, i, k, cpuIdx, cpuCount: integer;
|
|
|
+ pinServers2Cores: boolean;
|
|
|
+ cpuMask: TCpuSet;
|
|
|
+ flags: THttpServerOptions;
|
|
|
+begin
|
|
|
// setup logs
|
|
|
{$ifdef WITH_LOGS}
|
|
|
TSynLog.Family.Level := LOG_VERBOSE; // disable logs for benchmarking
|
|
@@ -616,24 +714,61 @@ begin
|
|
|
TypeInfo(TWorldRec), 'id,randomNumber:integer',
|
|
|
TypeInfo(TFortune), 'id:integer message:RawUtf8']);
|
|
|
|
|
|
- // setup execution context
|
|
|
- accessibleCPUCount := CurrentCpuSet(cpuMask);
|
|
|
-
|
|
|
- if ParamCount > 0 then
|
|
|
- ComputeExecutionContextFromParams(accessibleCPUCount)
|
|
|
+ // compute default execution context from HW information
|
|
|
+ cpuCount := CurrentCpuSet(cpuMask); // may run from a "taskset" command
|
|
|
+ if cpuCount >= 6 then
|
|
|
+ begin
|
|
|
+ // high-end CPU would scale better using several listeners (one per core)
|
|
|
+ // see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
|
|
|
+ servers := cpuCount;
|
|
|
+ threads := 8;
|
|
|
+ pinServers2Cores := true;
|
|
|
+ if GetEnvironmentVariable('TFB_TEST_NAME') = 'mormot-postgres-async' then
|
|
|
+ begin
|
|
|
+ // asynchronus test
|
|
|
+ servers := cpuCount * 2;
|
|
|
+ threads := 1;
|
|
|
+ end;
|
|
|
+ end
|
|
|
else
|
|
|
- ComputeExecutionContextFromNumberOfProcessors(accessibleCPUCount);
|
|
|
- flags := [];
|
|
|
- if servers > 1 then
|
|
|
- include(flags, hsoReusePort); // allow several bindings on the same port
|
|
|
+ begin
|
|
|
+ // simple CPU will have a single instance and a few threads per core
|
|
|
+ servers := 1;
|
|
|
+ threads := cpuCount * 4;
|
|
|
+ pinServers2Cores := false;
|
|
|
+ end;
|
|
|
+
|
|
|
+ // parse command line parameters
|
|
|
+ with Executable.Command do
|
|
|
+ begin
|
|
|
+ ExeDescription := 'TFB Server using mORMot 2';
|
|
|
+ if Option(['p', 'pin'], 'pin each server to a CPU') then
|
|
|
+ pinServers2Cores := true;
|
|
|
+ if Option('nopin', 'disable the CPU pinning') then
|
|
|
+ pinServers2Cores := false; // no option would keep the default boolean
|
|
|
+ Get(['s', 'servers'], servers, '#count of servers (listener sockets)', servers);
|
|
|
+ Get(['t', 'threads'], threads, 'per-server thread pool #size', threads);
|
|
|
+ if Option(['?', 'help'], 'display this message') then
|
|
|
+ begin
|
|
|
+ ConsoleWrite(FullDescription);
|
|
|
+ exit;
|
|
|
+ end;
|
|
|
+ if ConsoleWriteUnknown then
|
|
|
+ exit;
|
|
|
+ end;
|
|
|
|
|
|
// start the server instance(s), in hsoReusePort mode if needed
|
|
|
+ flags := [];
|
|
|
+ if servers > 1 then
|
|
|
+ include(flags, hsoReusePort) // allow several bindings on the same port
|
|
|
+ else
|
|
|
+ pinServers2Cores := false; // don't make any sense
|
|
|
SetLength(rawServers{%H-}, servers);
|
|
|
cpuIdx := -1; // do not pin to CPU by default
|
|
|
for i := 0 to servers - 1 do begin
|
|
|
if pinServers2Cores then
|
|
|
begin
|
|
|
- k := i mod accessibleCPUCount;
|
|
|
+ k := i mod cpuCount;
|
|
|
cpuIdx := -1;
|
|
|
// find real CPU index according to the cpuMask
|
|
|
repeat
|
|
@@ -641,7 +776,7 @@ begin
|
|
|
if GetBit(cpuMask, cpuIdx) then
|
|
|
dec(k);
|
|
|
until k = -1;
|
|
|
- writeln('Pin server #', i, ' to #', cpuIdx, ' CPU');
|
|
|
+ writeln('Pin #', i, ' server to #', cpuIdx, ' CPU');
|
|
|
end;
|
|
|
rawServers[i] := TRawAsyncServer.Create(threads, flags, cpuIdx)
|
|
|
end;
|
|
@@ -651,31 +786,32 @@ begin
|
|
|
writeln;
|
|
|
writeln(rawServers[0].fHttpServer.ClassName,
|
|
|
' running on localhost:', rawServers[0].fHttpServer.SockPort);
|
|
|
- writeln(' num thread=', threads,
|
|
|
+ writeln(' num servers=', servers,
|
|
|
+ ', threads per server=', threads,
|
|
|
+ ', total threads=', threads * servers,
|
|
|
', total CPU=', SystemInfo.dwNumberOfProcessors,
|
|
|
- ', accessible CPU=', accessibleCPUCount,
|
|
|
- ', num servers=', servers,
|
|
|
+ ', accessible CPU=', cpuCount,
|
|
|
', pinned=', pinServers2Cores,
|
|
|
- ', total workers=', threads * servers,
|
|
|
', db=', rawServers[0].fDbPool.DbmsEngineName);
|
|
|
writeln(' options=', GetSetName(TypeInfo(THttpServerOptions), flags));
|
|
|
- writeln('Press [Enter] or Ctrl+C or send SIGTERM to terminate'#10);
|
|
|
+ writeln('Press [Enter] or Ctrl+C or send SIGTERM to terminate');
|
|
|
ConsoleWaitForEnterKey;
|
|
|
//TSynLog.Family.Level := LOG_VERBOSE; // enable shutdown logs for debug
|
|
|
if servers = 1 then
|
|
|
- writeln(ObjectToJsonDebug(rawServers[i].fHttpServer,
|
|
|
+ writeln(ObjectToJsonDebug(rawServers[0].fHttpServer,
|
|
|
[woDontStoreVoid, woHumanReadable]))
|
|
|
else
|
|
|
begin
|
|
|
writeln('Per-server accepted connections:');
|
|
|
for i := 0 to servers - 1 do
|
|
|
- write(rawServers[i].fHttpServer.Async.Accepted, ' ');
|
|
|
- writeln;
|
|
|
+ write(' ', rawServers[i].fHttpServer.Async.Accepted);
|
|
|
+ writeln(#10'Please wait: Shutdown ', servers, ' servers');
|
|
|
end;
|
|
|
finally
|
|
|
// clear all server instance(s)
|
|
|
ObjArrayClear(rawServers);
|
|
|
end;
|
|
|
+ write('Shutdown complete'#10);
|
|
|
{$ifdef FPC_X64MM}
|
|
|
WriteHeapStatus(' ', 16, 8, {compileflags=}true);
|
|
|
{$endif FPC_X64MM}
|