|
@@ -54,7 +54,7 @@ type
|
|
|
end;
|
|
|
TWorlds = array of TWorldRec;
|
|
|
TFortune = packed record
|
|
|
- id: integer;
|
|
|
+ id: PtrUInt;
|
|
|
message: PUtf8Char;
|
|
|
end;
|
|
|
TFortunes = array of TFortune;
|
|
@@ -86,6 +86,7 @@ type
|
|
|
fStore: TRestServerDB;
|
|
|
fTemplate: TSynMustache;
|
|
|
fCachedWorldsTable: POrmCacheTable;
|
|
|
+ fRawCache: TOrmWorlds;
|
|
|
fDbPool: TSqlDBPostgresConnectionProperties;
|
|
|
procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
@@ -107,6 +108,7 @@ type
|
|
|
function updates(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawdb(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawqueries(ctxt: THttpServerRequest): cardinal;
|
|
|
+ function rawcached(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
function rawupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
// asynchronous PostgreSQL pipelined DB access
|
|
@@ -144,13 +146,13 @@ const
|
|
|
'</html>';
|
|
|
|
|
|
|
|
|
-function ComputeRandomWorld: integer; inline;
|
|
|
+function ComputeRandomWorld(gen: PLecuyer): integer; inline;
|
|
|
begin
|
|
|
- result := Random32(WORLD_COUNT) + 1;
|
|
|
+ result := gen^.Next(WORLD_COUNT) + 1;
|
|
|
end;
|
|
|
|
|
|
function GetQueriesParamValue(ctxt: THttpServerRequest;
|
|
|
- const search: RawUtf8 = 'QUERIES='): cardinal;
|
|
|
+ const search: RawUtf8 = 'QUERIES='): cardinal; inline;
|
|
|
begin
|
|
|
if not ctxt.UrlParam(search, result) or
|
|
|
(result = 0) then
|
|
@@ -190,6 +192,7 @@ begin
|
|
|
if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
|
|
|
fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
|
|
|
fCachedWorldsTable := fStore.Orm.Cache.Table(TOrmCachedWorld);
|
|
|
+ fStore.Orm.RetrieveListObjArray(fRawCache, TOrmCachedWorld, 'order by id', []);
|
|
|
// initialize the mustache template for /fortunes
|
|
|
fTemplate := TSynMustache.Parse(FORTUNES_TPL);
|
|
|
// setup the HTTP server
|
|
@@ -222,22 +225,26 @@ begin
|
|
|
fHttpServer.Free;
|
|
|
fStore.Free;
|
|
|
fModel.Free;
|
|
|
- fDBPool.free;
|
|
|
+ fDBPool.Free;
|
|
|
+ ObjArrayClear(fRawCache);
|
|
|
inherited Destroy;
|
|
|
end;
|
|
|
|
|
|
// query DB world table for /rawqueries and /rawupdates endpoints
|
|
|
|
|
|
-function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
|
|
|
+function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt;
|
|
|
+ out res: TWorlds): boolean;
|
|
|
var
|
|
|
conn: TSqlDBConnection;
|
|
|
stmt: ISqlDBStatement;
|
|
|
pConn: TSqlDBPostgresConnection absolute conn;
|
|
|
pStmt: TSqlDBPostgresStatement;
|
|
|
+ gen: PLecuyer;
|
|
|
i: PtrInt;
|
|
|
begin
|
|
|
result := false;
|
|
|
SetLength(res{%H-}, cnt);
|
|
|
+ gen := Lecuyer;
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
|
// specific code to use PostgresSQL pipelining mode
|
|
|
// see test_nosync in
|
|
@@ -247,7 +254,7 @@ begin
|
|
|
pStmt := TSqlDBPostgresStatement(stmt.Instance);
|
|
|
for i := 0 to cnt - 1 do
|
|
|
begin
|
|
|
- pStmt.Bind(1, ComputeRandomWorld);
|
|
|
+ pStmt.Bind(1, ComputeRandomWorld(gen));
|
|
|
pStmt.SendPipelinePrepared;
|
|
|
pConn.PipelineSync;
|
|
|
end;
|
|
@@ -323,7 +330,7 @@ function TRawAsyncServer.db(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
w: TOrmWorld;
|
|
|
begin
|
|
|
- w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
|
|
|
+ w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(Lecuyer));
|
|
|
try
|
|
|
ctxt.SetOutJson(w);
|
|
|
result := HTTP_SUCCESS;
|
|
@@ -336,10 +343,12 @@ function TRawAsyncServer.queries(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
i: PtrInt;
|
|
|
res: TOrmWorlds;
|
|
|
+ gen: PLecuyer;
|
|
|
begin
|
|
|
SetLength(res, GetQueriesParamValue(ctxt, 'QUERIES='));
|
|
|
+ gen := Lecuyer;
|
|
|
for i := 0 to length(res) - 1 do
|
|
|
- res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
|
|
|
+ res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(gen));
|
|
|
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
|
|
|
ObjArrayClear(res);
|
|
|
result := HTTP_SUCCESS;
|
|
@@ -349,10 +358,12 @@ function TRawAsyncServer.cached_queries(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
i: PtrInt;
|
|
|
res: TOrmWorlds;
|
|
|
+ gen: PLecuyer;
|
|
|
begin
|
|
|
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
|
|
|
+ gen := Lecuyer;
|
|
|
for i := 0 to length(res) - 1 do
|
|
|
- res[i] := fCachedWorldsTable.Get(ComputeRandomWorld);
|
|
|
+ res[i] := fCachedWorldsTable.Get(ComputeRandomWorld(gen));
|
|
|
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
@@ -390,19 +401,21 @@ var
|
|
|
res: TOrmWorlds;
|
|
|
w: TOrmWorld;
|
|
|
b: TRestBatch;
|
|
|
+ gen: PLecuyer;
|
|
|
begin
|
|
|
result := HTTP_SERVERERROR;
|
|
|
SetLength(res, GetQueriesParamValue(ctxt));
|
|
|
b := TRestBatch.Create(fStore.ORM, TOrmWorld, {transrows=}0,
|
|
|
[boExtendedJson, boNoModelEncoding, boPutNoCacheFlush]);
|
|
|
try
|
|
|
+ gen := Lecuyer;
|
|
|
for i := 0 to length(res) - 1 do
|
|
|
begin
|
|
|
w := TOrmWorld.Create;
|
|
|
res[i] := w;
|
|
|
- if not fStore.Orm.Retrieve(ComputeRandomWorld, w) then
|
|
|
+ if not fStore.Orm.Retrieve(ComputeRandomWorld(gen), w) then
|
|
|
exit;
|
|
|
- w.RandomNumber := ComputeRandomWorld;
|
|
|
+ w.RandomNumber := ComputeRandomWorld(gen);
|
|
|
b.Update(w);
|
|
|
end;
|
|
|
result := b.Send;
|
|
@@ -422,7 +435,7 @@ begin
|
|
|
result := HTTP_SERVERERROR;
|
|
|
conn := fDbPool.ThreadSafeConnection;
|
|
|
stmt := conn.NewStatementPrepared(WORLD_READ_SQL, true, true);
|
|
|
- stmt.Bind(1, ComputeRandomWorld);
|
|
|
+ stmt.Bind(1, ComputeRandomWorld(Lecuyer));
|
|
|
stmt.ExecutePrepared;
|
|
|
if stmt.Step then
|
|
|
begin
|
|
@@ -444,6 +457,20 @@ begin
|
|
|
result := HTTP_SUCCESS;
|
|
|
end;
|
|
|
|
|
|
+function TRawAsyncServer.rawcached(ctxt: THttpServerRequest): cardinal;
|
|
|
+var
|
|
|
+ i: PtrInt;
|
|
|
+ res: TOrmWorlds;
|
|
|
+ gen: PLecuyer;
|
|
|
+begin
|
|
|
+ SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
|
|
|
+ gen := Lecuyer;
|
|
|
+ for i := 0 to length(res) - 1 do
|
|
|
+ res[i] := fRawCache[ComputeRandomWorld(gen) - 1];
|
|
|
+ ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
|
|
|
+ result := HTTP_SUCCESS;
|
|
|
+end;
|
|
|
+
|
|
|
function TRawAsyncServer.rawfortunes(ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
conn: TSqlDBConnection;
|
|
@@ -476,10 +503,7 @@ begin
|
|
|
try
|
|
|
W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
|
|
|
for i := 1 to cnt do
|
|
|
- begin
|
|
|
- W.AddShort('(?::integer, ?::integer)');
|
|
|
- W.Add(',');
|
|
|
- end;
|
|
|
+ W.AddShort('(?::integer, ?::integer),');
|
|
|
W.CancelLastComma;
|
|
|
W.AddShort(' order by 1) AS v (id, randomNumber) WHERE world.id = v.id');
|
|
|
W.SetText(LastComputeUpdateSql);
|
|
@@ -496,6 +520,7 @@ var
|
|
|
cnt, i: PtrInt;
|
|
|
res: TWorlds;
|
|
|
ids, nums: TInt64DynArray;
|
|
|
+ gen: PLecuyer;
|
|
|
conn: TSqlDBConnection;
|
|
|
stmt: ISqlDBStatement;
|
|
|
begin
|
|
@@ -505,8 +530,9 @@ begin
|
|
|
if not getRawRandomWorlds(cnt, res) then
|
|
|
exit;
|
|
|
// generate new randoms
|
|
|
+ gen := Lecuyer;
|
|
|
for i := 0 to cnt - 1 do
|
|
|
- res[i].randomNumber := ComputeRandomWorld;
|
|
|
+ res[i].randomNumber := ComputeRandomWorld(gen);
|
|
|
if cnt > 20 then
|
|
|
begin
|
|
|
// fill parameters arrays for update with nested select (PostgreSQL only)
|
|
@@ -546,7 +572,7 @@ function TRawAsyncServer.asyncdb(ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
with fDbPool.Async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
|
|
|
try
|
|
|
- Bind(1, ComputeRandomWorld);
|
|
|
+ Bind(1, ComputeRandomWorld(Lecuyer));
|
|
|
ExecuteAsync(ctxt, OnAsyncDb);
|
|
|
finally
|
|
|
UnLock;
|
|
@@ -591,8 +617,9 @@ type
|
|
|
res: TWorlds;
|
|
|
count, current: integer;
|
|
|
update: TSqlDBPostgresAsyncStatement; // prepared before any callback
|
|
|
- function Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
|
|
|
- function Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
|
|
|
+ async: TSqlDBPostgresAsync;
|
|
|
+ function Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
+ function Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
procedure DoUpdates;
|
|
|
procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
|
procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
|
|
@@ -600,48 +627,51 @@ type
|
|
|
|
|
|
function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
- result := TAsyncWorld.Create.Queries(fDBPool.Async, ctxt);
|
|
|
+ result := TAsyncWorld.Create.Queries(self, ctxt);
|
|
|
end;
|
|
|
|
|
|
function TRawAsyncServer.asyncupdates(ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
- result := TAsyncWorld.Create.Updates(fDBPool.Async, ctxt);
|
|
|
+ result := TAsyncWorld.Create.Updates(self, ctxt);
|
|
|
end;
|
|
|
|
|
|
|
|
|
{ TAsyncWorld }
|
|
|
|
|
|
-function TAsyncWorld.Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
|
|
|
+function TAsyncWorld.Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
var
|
|
|
n: integer;
|
|
|
- opt: TSqlDBPostgresAsyncStatementOptions; // for modified libpq
|
|
|
+ opt: TSqlDBPostgresAsyncStatementOptions; // forced options for modified libpq
|
|
|
+ gen: PLecuyer;
|
|
|
+ select: TSqlDBPostgresAsyncStatement;
|
|
|
begin
|
|
|
request := ctxt;
|
|
|
+ if async = nil then
|
|
|
+ async := server.fDbPool.Async;
|
|
|
if count = 0 then
|
|
|
count := getQueriesParamValue(ctxt);
|
|
|
SetLength(res, count); // count is > 0
|
|
|
- with async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
|
|
|
- try
|
|
|
- opt := AsyncOptions - [asoForceConnectionFlush];
|
|
|
- n := count;
|
|
|
- repeat
|
|
|
- dec(n);
|
|
|
- Bind(1, ComputeRandomWorld);
|
|
|
- if n = 0 then // last item should include asoForceConnectionFlush (if set)
|
|
|
- opt := AsyncOptions;
|
|
|
- ExecuteAsync(ctxt, OnQueries, @opt);
|
|
|
- until n = 0;
|
|
|
- finally
|
|
|
- UnLock;
|
|
|
- end;
|
|
|
+ select := async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT);
|
|
|
+ opt := ASYNC_OPT - [asoForceConnectionFlush];
|
|
|
+ n := count;
|
|
|
+ gen := Lecuyer;
|
|
|
+ repeat
|
|
|
+ dec(n);
|
|
|
+ select.Bind(1, ComputeRandomWorld(gen));
|
|
|
+ if n = 0 then // last item should include asoForceConnectionFlush (if set)
|
|
|
+ opt := ASYNC_OPT;
|
|
|
+ select.ExecuteAsync(ctxt, OnQueries, @opt);
|
|
|
+ until n = 0;
|
|
|
+ select.UnLock;
|
|
|
result := ctxt.SetAsyncResponse;
|
|
|
end;
|
|
|
|
|
|
-function TAsyncWorld.Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
|
|
|
+function TAsyncWorld.Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
|
|
|
begin
|
|
|
+ async := server.fDbPool.Async;
|
|
|
count := getQueriesParamValue(ctxt);
|
|
|
update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT);
|
|
|
- result := Queries(async, ctxt);
|
|
|
+ result := Queries(server, ctxt);
|
|
|
end;
|
|
|
|
|
|
procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
|
|
@@ -666,9 +696,11 @@ procedure TAsyncWorld.DoUpdates;
|
|
|
var
|
|
|
i: PtrInt;
|
|
|
params: TIntegerDynArray;
|
|
|
+ gen: PLecuyer;
|
|
|
begin
|
|
|
+ gen := Lecuyer;
|
|
|
for i := 0 to count - 1 do
|
|
|
- res[i].randomNumber := ComputeRandomWorld;
|
|
|
+ res[i].randomNumber := ComputeRandomWorld(gen);
|
|
|
SetLength(params, count);
|
|
|
for i := 0 to count - 1 do
|
|
|
params[i] := res[i].id;
|
|
@@ -708,8 +740,8 @@ begin
|
|
|
// register some RTTI for records JSON serialization
|
|
|
Rtti.RegisterFromText([
|
|
|
TypeInfo(TMessageRec), 'message:PUtf8Char',
|
|
|
- TypeInfo(TWorldRec), 'id,randomNumber:integer',
|
|
|
- TypeInfo(TFortune), 'id:integer message:PUtf8Char']);
|
|
|
+ TypeInfo(TWorldRec), 'id,randomNumber:cardinal',
|
|
|
+ TypeInfo(TFortune), 'id:PtrUInt message:PUtf8Char']);
|
|
|
|
|
|
// compute default execution context from HW information
|
|
|
cpuCount := CurrentCpuSet(cpuMask); // may run from a "taskset" command
|
|
@@ -731,7 +763,7 @@ begin
|
|
|
begin
|
|
|
// asynchronus test with single listener socket and no CPU pinning
|
|
|
servers := 1;
|
|
|
- threads := cpuCount * 2;
|
|
|
+ threads := cpuCount * 4;
|
|
|
pinServers2Cores := false;
|
|
|
end;
|
|
|
end
|
|
@@ -753,12 +785,7 @@ begin
|
|
|
pinServers2Cores := false; // no option would keep the default boolean
|
|
|
Get(['s', 'servers'], servers, '#count of servers (listener sockets)', servers);
|
|
|
Get(['t', 'threads'], threads, 'per-server thread pool #size', threads);
|
|
|
- if Option(['?', 'help'], 'display this message') then
|
|
|
- begin
|
|
|
- ConsoleWrite(FullDescription);
|
|
|
- exit;
|
|
|
- end;
|
|
|
- if ConsoleWriteUnknown then
|
|
|
+ if ConsoleHelpFailed('TFB Server using mORMot 2') then
|
|
|
exit;
|
|
|
end;
|
|
|
|