|
@@ -85,14 +85,14 @@ type
|
|
|
fModel: TOrmModel;
|
|
|
fStore: TRestServerDB;
|
|
|
fTemplate: TSynMustache;
|
|
|
- fCachedWorldsTable: POrmCacheTable;
|
|
|
+ fOrmCache: POrmCacheTable;
|
|
|
fRawCache: TOrmWorlds;
|
|
|
fDbPool: TSqlDBPostgresConnectionProperties;
|
|
|
- procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
- procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
+ procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: PtrInt);
|
|
|
+ procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: PtrInt);
|
|
|
// pipelined reading as used by /rawqueries and /rawupdates
|
|
|
function GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
|
|
|
- function ComputeRawFortunes(stmt: TSqlDBStatement; ctxt: THttpServerRequest): integer;
|
|
|
+ function ComputeRawFortunes(stmt: TSqlDBStatement): RawUtf8;
|
|
|
public
|
|
|
constructor Create(threadCount: integer; flags: THttpServerOptions;
|
|
|
pin2Core: integer); reintroduce;
|
|
@@ -170,7 +170,7 @@ begin
|
|
|
inherited Create;
|
|
|
fDbPool := TSqlDBPostgresConnectionProperties.Create(
|
|
|
'tfb-database:5432', 'hello_world', 'benchmarkdbuser', 'benchmarkdbpass');
|
|
|
- fDbPool.ArrayParamsAsBinary := true;
|
|
|
+ // fDbPool.ArrayParamsAsBinary := true; // seems not really faster
|
|
|
// customize JSON serialization for TFB expectations
|
|
|
TOrmWorld.OrmProps.Fields.JsonRenameProperties([
|
|
|
'ID', 'id',
|
|
@@ -188,10 +188,10 @@ begin
|
|
|
fStore := TRestServerDB.Create(fModel, SQLITE_MEMORY_DATABASE_NAME);
|
|
|
fStore.NoAjaxJson := true;
|
|
|
fStore.Server.CreateMissingTables; // create SQlite3 virtual tables
|
|
|
- // pre-fill the ORM
|
|
|
+ // ORM and raw caches warmup
|
|
|
if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
|
|
|
fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
|
|
|
- fCachedWorldsTable := fStore.Orm.Cache.Table(TOrmCachedWorld);
|
|
|
+ fOrmCache := fStore.Orm.Cache.Table(TOrmCachedWorld);
|
|
|
fStore.Orm.RetrieveListObjArray(fRawCache, TOrmCachedWorld, 'order by id', []);
|
|
|
// initialize the mustache template for /fortunes
|
|
|
fTemplate := TSynMustache.Parse(FORTUNES_TPL);
|
|
@@ -277,8 +277,7 @@ begin
|
|
|
result := StrComp(pointer(TFortune(A).message), pointer(TFortune(B).message));
|
|
|
end;
|
|
|
|
|
|
-function TRawAsyncServer.ComputeRawFortunes(
|
|
|
- stmt: TSqlDBStatement; ctxt: THttpServerRequest): integer;
|
|
|
+function TRawAsyncServer.ComputeRawFortunes(stmt: TSqlDBStatement): RawUtf8;
|
|
|
var
|
|
|
list: TFortunes;
|
|
|
arr: TDynArray;
|
|
@@ -286,7 +285,7 @@ var
|
|
|
f: ^TFortune;
|
|
|
mus: TSynMustacheContextData;
|
|
|
begin
|
|
|
- result := HTTP_BADREQUEST;
|
|
|
+ result := '';
|
|
|
if stmt = nil then
|
|
|
exit;
|
|
|
arr.Init(TypeInfo(TFortunes), list, @n);
|
|
@@ -303,9 +302,7 @@ begin
|
|
|
mus := stmt.Connection.GetThreadOwned(TSynMustacheContextData);
|
|
|
if mus = nil then
|
|
|
mus := stmt.Connection.SetThreadOwned(fTemplate.NewMustacheContextData);
|
|
|
- ctxt.OutContent := mus.RenderArray(arr);
|
|
|
- ctxt.OutContentType := HTML_CONTENT_TYPE;
|
|
|
- result := HTTP_SUCCESS;
|
|
|
+ result := mus.RenderArray(arr);
|
|
|
end;
|
|
|
|
|
|
// following methods implement the server endpoints
|
|
@@ -363,7 +360,7 @@ begin
|
|
|
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
|
|
|
gen := Lecuyer;
|
|
|
for i := 0 to length(res) - 1 do
|
|
|
- res[i] := fCachedWorldsTable.Get(ComputeRandomWorld(gen));
|
|
|
+ res[i] := fOrmCache.Get(ComputeRandomWorld(gen));
|
|
|
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
@@ -479,7 +476,9 @@ begin
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
|
stmt := conn.NewStatementPrepared(FORTUNES_SQL, true, true);
|
|
|
stmt.ExecutePrepared;
|
|
|
- result := ComputeRawFortunes(stmt.Instance, ctxt);
|
|
|
+ ctxt.OutContent := ComputeRawFortunes(stmt.Instance);
|
|
|
+ ctxt.OutContentType := HTML_CONTENT_TYPE;
|
|
|
+ result := HTTP_SUCCESS;
|
|
|
end;
|
|
|
|
|
|
var
|
|
@@ -519,7 +518,7 @@ function TRawAsyncServer.rawupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
cnt, i: PtrInt;
|
|
|
res: TWorlds;
|
|
|
- ids, nums: TInt64DynArray;
|
|
|
+ params: TInt64DynArray;
|
|
|
gen: PLecuyer;
|
|
|
conn: TSqlDBConnection;
|
|
|
stmt: ISqlDBStatement;
|
|
@@ -536,16 +535,14 @@ begin
|
|
|
if cnt > 20 then
|
|
|
begin
|
|
|
// fill parameters arrays for update with nested select (PostgreSQL only)
|
|
|
- setLength(ids{%H-}, cnt);
|
|
|
- setLength(nums{%H-}, cnt);
|
|
|
- for i := 0 to cnt - 1 do
|
|
|
- begin
|
|
|
- ids[i] := res[i].id;
|
|
|
- nums[i] := res[i].randomNumber;
|
|
|
- end;
|
|
|
stmt := conn.NewStatementPrepared(WORLD_UPDATE_SQLN, false, true);
|
|
|
- stmt.BindArray(1, ids);
|
|
|
- stmt.BindArray(2, nums);
|
|
|
+ SetLength(params{%H-}, cnt);
|
|
|
+ for i := 0 to cnt - 1 do
|
|
|
+ params[i] := res[i].id;
|
|
|
+ stmt.BindArray(1, params);
|
|
|
+ for i := 0 to cnt - 1 do
|
|
|
+ params[i] := res[i].randomNumber;
|
|
|
+ stmt.BindArray(2, params);
|
|
|
end
|
|
|
else
|
|
|
begin
|
|
@@ -573,47 +570,43 @@ begin
|
|
|
with fDbPool.Async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
|
|
|
try
|
|
|
Bind(1, ComputeRandomWorld(Lecuyer));
|
|
|
- ExecuteAsync(ctxt, OnAsyncDb);
|
|
|
+ ExecuteAsync(ctxt.AsyncHandle, OnAsyncDb);
|
|
|
finally
|
|
|
UnLock;
|
|
|
end;
|
|
|
- result := ctxt.SetAsyncResponse;
|
|
|
+ result := HTTP_ASYNCRESPONSE;
|
|
|
end;
|
|
|
|
|
|
procedure TRawAsyncServer.OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
- Context: TObject);
|
|
|
-var
|
|
|
- ctxt: THttpServerRequest absolute Context;
|
|
|
+ Context: PtrInt);
|
|
|
begin
|
|
|
if (Statement = nil) or
|
|
|
not Statement.Step then
|
|
|
- ctxt.ErrorMessage := 'asyncdb failed'
|
|
|
+ fHttpServer.AsyncResponseError(Context, 'asyncdb failed')
|
|
|
else
|
|
|
- ctxt.SetOutJson('{"id":%,"randomNumber":%}',
|
|
|
+ fHttpServer.AsyncResponseFmt(Context, '{"id":%,"randomNumber":%}',
|
|
|
[Statement.ColumnInt(0), Statement.ColumnInt(1)]);
|
|
|
- ctxt.OnAsyncResponse(ctxt);
|
|
|
end;
|
|
|
|
|
|
function TRawAsyncServer.asyncfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
fDbPool.Async.PrepareLocked(FORTUNES_SQL, {res=}true, ASYNC_OPT).
|
|
|
- ExecuteAsyncNoParam(ctxt, OnAsyncFortunes);
|
|
|
- result := ctxt.SetAsyncResponse;
|
|
|
+ ExecuteAsyncNoParam(ctxt.AsyncHandle, OnAsyncFortunes);
|
|
|
+ result := HTTP_ASYNCRESPONSE;
|
|
|
end;
|
|
|
|
|
|
procedure TRawAsyncServer.OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
- Context: TObject);
|
|
|
-var
|
|
|
- ctxt: THttpServerRequest absolute Context;
|
|
|
+ Context: PtrInt);
|
|
|
begin
|
|
|
- ctxt.OnAsyncResponse(ctxt, ComputeRawFortunes(Statement, ctxt));
|
|
|
+ fHttpServer.AsyncResponse(Context, ComputeRawFortunes(Statement), HTML_CONTENT_TYPE);
|
|
|
end;
|
|
|
|
|
|
type
|
|
|
// simple state machine used for /asyncqueries and /asyncupdates
|
|
|
TAsyncWorld = class
|
|
|
public
|
|
|
- request: THttpServerRequest;
|
|
|
+ http: THttpAsyncServer;
|
|
|
+ connection: TConnectionAsyncHandle;
|
|
|
res: TWorlds;
|
|
|
count, current: integer;
|
|
|
update: TSqlDBPostgresAsyncStatement; // prepared before any callback
|
|
@@ -621,8 +614,8 @@ type
|
|
|
function Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
function Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
procedure DoUpdates;
|
|
|
- procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
- procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
+ procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: PtrInt);
|
|
|
+ procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: PtrInt);
|
|
|
end;
|
|
|
|
|
|
function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
|
|
@@ -645,7 +638,8 @@ var
|
|
|
gen: PLecuyer;
|
|
|
select: TSqlDBPostgresAsyncStatement;
|
|
|
begin
|
|
|
- request := ctxt;
|
|
|
+ http := server.fHttpServer;
|
|
|
+ connection := ctxt.AsyncHandle;
|
|
|
if async = nil then
|
|
|
async := server.fDbPool.Async;
|
|
|
if count = 0 then
|
|
@@ -656,26 +650,26 @@ begin
|
|
|
n := count;
|
|
|
gen := Lecuyer;
|
|
|
repeat
|
|
|
- dec(n);
|
|
|
select.Bind(1, ComputeRandomWorld(gen));
|
|
|
+ dec(n);
|
|
|
if n = 0 then // last item should include asoForceConnectionFlush (if set)
|
|
|
opt := ASYNC_OPT;
|
|
|
- select.ExecuteAsync(ctxt, OnQueries, @opt);
|
|
|
+ select.ExecuteAsync(connection, OnQueries, @opt);
|
|
|
until n = 0;
|
|
|
select.UnLock;
|
|
|
- result := ctxt.SetAsyncResponse;
|
|
|
+ result := HTTP_ASYNCRESPONSE;
|
|
|
end;
|
|
|
|
|
|
function TAsyncWorld.Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
async := server.fDbPool.Async;
|
|
|
count := getQueriesParamValue(ctxt);
|
|
|
- update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT);
|
|
|
- result := Queries(server, ctxt);
|
|
|
+ update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT); // to trigger DoUpdates
|
|
|
+ result := Queries(server, ctxt); // will set http and connection fields
|
|
|
end;
|
|
|
|
|
|
procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
- Context: TObject);
|
|
|
+ Context: PtrInt);
|
|
|
begin
|
|
|
if (Statement <> nil) and
|
|
|
Statement.Step then
|
|
@@ -708,14 +702,13 @@ begin
|
|
|
for i := 0 to count - 1 do
|
|
|
params[i] := res[i].randomNumber;
|
|
|
update.BindArrayInt32(2, params);
|
|
|
- update.ExecuteAsync(request, OnRes);
|
|
|
+ update.ExecuteAsync(connection, OnRes);
|
|
|
end;
|
|
|
|
|
|
procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
|
|
|
- Context: TObject);
|
|
|
+ Context: PtrInt);
|
|
|
begin
|
|
|
- request.SetOutJson(@res, TypeInfo(TWorlds));
|
|
|
- request.OnAsyncResponse(Context as THttpServerRequest);
|
|
|
+ http.AsyncResponseJson(Context, @res, TypeInfo(TWorlds));
|
|
|
Free; // we don't need this state machine any more
|
|
|
end;
|
|
|
|
|
@@ -733,6 +726,9 @@ begin
|
|
|
TSynLog.Family.HighResolutionTimestamp := true;
|
|
|
TSynLog.Family.PerThreadLog := ptIdentifiedInOneFile;
|
|
|
TSynLog.Family.AutoFlushTimeOut := 1;
|
|
|
+ TSynLog.Family.RotateFileCount := 10;
|
|
|
+ TSynLog.Family.RotateFileSizeKB := 500000;
|
|
|
+ LogCompressAlgo := nil; // keep 10 x 512MB uncompressed files
|
|
|
{$else}
|
|
|
SynDBLog := nil; // slightly faster: no need to check log level
|
|
|
{$endif WITH_LOGS}
|
|
@@ -745,27 +741,20 @@ begin
|
|
|
|
|
|
// compute default execution context from HW information
|
|
|
cpuCount := CurrentCpuSet(cpuMask); // may run from a "taskset" command
|
|
|
- if cpuCount >= 6 then
|
|
|
+ if GetEnvironmentVariable('TFB_TEST_NAME') = 'mormot-postgres-async' then
|
|
|
+ begin
|
|
|
+ // asynchronous tests do not require several listeners
|
|
|
+ servers := 1;
|
|
|
+ threads := cpucount * 4;
|
|
|
+ pinServers2Cores := false;
|
|
|
+ end
|
|
|
+ else if cpuCount >= 6 then
|
|
|
begin
|
|
|
// high-end CPU would scale better using several listeners (one per core)
|
|
|
// see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
|
|
|
servers := cpuCount;
|
|
|
threads := 8;
|
|
|
pinServers2Cores := true;
|
|
|
- if GetEnvironmentVariable('TFB_TEST_NAME') = 'mormot-postgres-async' then
|
|
|
- begin
|
|
|
- // asynchronus test
|
|
|
- servers := cpuCount;
|
|
|
- threads := 8;
|
|
|
- end
|
|
|
- else
|
|
|
- if GetEnvironmentVariable('TFB_TEST_NAME') = 'mormot-postgres-async2' then
|
|
|
- begin
|
|
|
- // asynchronus test with single listener socket and no CPU pinning
|
|
|
- servers := 1;
|
|
|
- threads := cpuCount * 4;
|
|
|
- pinServers2Cores := false;
|
|
|
- end;
|
|
|
end
|
|
|
else
|
|
|
begin
|
|
@@ -778,13 +767,12 @@ begin
|
|
|
// parse command line parameters
|
|
|
with Executable.Command do
|
|
|
begin
|
|
|
- ExeDescription := 'TFB Server using mORMot 2';
|
|
|
- if Option(['p', 'pin'], 'pin each server to a CPU') then
|
|
|
+ if Option('&pin', 'pin each server to a CPU') then
|
|
|
pinServers2Cores := true;
|
|
|
if Option('nopin', 'disable the CPU pinning') then
|
|
|
pinServers2Cores := false; // no option would keep the default boolean
|
|
|
- Get(['s', 'servers'], servers, '#count of servers (listener sockets)', servers);
|
|
|
- Get(['t', 'threads'], threads, 'per-server thread pool #size', threads);
|
|
|
+ Get('&servers', servers, '#count of servers (listener sockets)', servers);
|
|
|
+ Get('&threads', threads, 'per-server thread pool #size', threads);
|
|
|
if ConsoleHelpFailed('TFB Server using mORMot 2') then
|
|
|
exit;
|
|
|
end;
|
|
@@ -809,42 +797,41 @@ begin
|
|
|
if GetBit(cpuMask, cpuIdx) then
|
|
|
dec(k);
|
|
|
until k = -1;
|
|
|
- writeln('Pin #', i, ' server to #', cpuIdx, ' CPU');
|
|
|
+ ConsoleWrite(['Pin #', i, ' server to #', cpuIdx, ' CPU']);
|
|
|
end;
|
|
|
rawServers[i] := TRawAsyncServer.Create(threads, flags, cpuIdx)
|
|
|
end;
|
|
|
|
|
|
try
|
|
|
// display some information and wait for SIGTERM
|
|
|
- writeln;
|
|
|
- writeln(rawServers[0].fHttpServer.ClassName,
|
|
|
- ' running on localhost:', rawServers[0].fHttpServer.SockPort);
|
|
|
- writeln(' num servers=', servers,
|
|
|
- ', threads per server=', threads,
|
|
|
- ', total threads=', threads * servers,
|
|
|
- ', total CPU=', SystemInfo.dwNumberOfProcessors,
|
|
|
- ', accessible CPU=', cpuCount,
|
|
|
- ', pinned=', pinServers2Cores,
|
|
|
- ', db=', rawServers[0].fDbPool.DbmsEngineName);
|
|
|
- writeln(' options=', GetSetName(TypeInfo(THttpServerOptions), flags));
|
|
|
- writeln('Press [Enter] or Ctrl+C or send SIGTERM to terminate');
|
|
|
+ ConsoleWrite([CRLF, rawServers[0].fHttpServer.ClassName,
|
|
|
+ ' running on localhost:', rawServers[0].fHttpServer.SockPort], ccWhite);
|
|
|
+ ConsoleWrite([' num servers=', servers,
|
|
|
+ ', threads per server=', threads,
|
|
|
+ ', total threads=', threads * servers,
|
|
|
+ ', total CPU=', SystemInfo.dwNumberOfProcessors,
|
|
|
+ ', accessible CPU=', cpuCount,
|
|
|
+ ', pinned=', pinServers2Cores,
|
|
|
+ ', db=', rawServers[0].fDbPool.DbmsEngineName, CRLF,
|
|
|
+ ' options=', GetSetName(TypeInfo(THttpServerOptions), flags), CRLF]);
|
|
|
+ ConsoleWrite('Press [Enter] or Ctrl+C or send SIGTERM to terminate', ccWhite);
|
|
|
ConsoleWaitForEnterKey;
|
|
|
//TSynLog.Family.Level := LOG_VERBOSE; // enable shutdown logs for debug
|
|
|
if servers = 1 then
|
|
|
- writeln(ObjectToJsonDebug(rawServers[0].fHttpServer))
|
|
|
+ ConsoleObject(rawServers[0].fHttpServer)
|
|
|
else
|
|
|
begin
|
|
|
- writeln('Per-server accepted connections:');
|
|
|
+ ConsoleWrite('Per-server accepted connections:');
|
|
|
for i := 0 to servers - 1 do
|
|
|
- write(' ', rawServers[i].fHttpServer.Async.Accepted);
|
|
|
- writeln(#10'Please wait: Shutdown ', servers, ' servers and ',
|
|
|
- threads * servers, ' threads');
|
|
|
+ ConsoleWrite([' ', rawServers[i].fHttpServer.Async.Accepted], ccLightGray, true);
|
|
|
+ ConsoleWrite([#10'Please wait: Shutdown ', servers, ' servers and ',
|
|
|
+ threads * servers, ' threads']);
|
|
|
end;
|
|
|
finally
|
|
|
// clear all server instance(s)
|
|
|
ObjArrayClear(rawServers);
|
|
|
end;
|
|
|
- write('Shutdown complete'#10);
|
|
|
+ ConsoleWrite('Shutdown complete'#10);
|
|
|
{$ifdef FPC_X64MM}
|
|
|
WriteHeapStatus(' ', 16, 8, {compileflags=}true);
|
|
|
{$endif FPC_X64MM}
|