Эх сурвалжийг харах

[mORMot] use CPU pinning to minimize cpu-migrations (#8153)

Co-authored-by: pavel.mash <[email protected]>
pavelmash 2 жил өмнө
parent
commit
b82b4ce88f

+ 1 - 1
frameworks/Pascal/mormot/setup_and_build.sh

@@ -35,7 +35,7 @@ echo "Download statics from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot/static
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot/static
 
 
 # uncomment for fixed commit URL
 # uncomment for fixed commit URL
-URL=https://github.com/synopse/mORMot2/tarball/08dd4a1898fa2e66b659f0679f7a038b3b2f2c65
+URL=https://github.com/synopse/mORMot2/tarball/f430300300ed51f128b32036423d920032628159
 #URL="https://api.github.com/repos/synopse/mORMot2/tarball/$USED_TAG"
 #URL="https://api.github.com/repos/synopse/mORMot2/tarball/$USED_TAG"
 echo "Download and unpacking mORMot sources from $URL ..."
 echo "Download and unpacking mORMot sources from $URL ..."
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot  --strip-components=1
 wget -qO- "$URL" | tar -xz -C ./libs/mORMot  --strip-components=1

+ 106 - 41
frameworks/Pascal/mormot/src/raw.pas

@@ -90,7 +90,8 @@ type
     // as used by /rawqueries and /rawupdates
     // as used by /rawqueries and /rawupdates
     function GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
     function GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
   public
   public
-    constructor Create(threadCount: integer; flags: THttpServerOptions); reintroduce;
+    constructor Create(threadCount: integer; flags: THttpServerOptions;
+      pin2Core: integer = -1); reintroduce;
     destructor Destroy; override;
     destructor Destroy; override;
   published
   published
     // all service URI are implemented by these published methods using RTTI
     // all service URI are implemented by these published methods using RTTI
@@ -107,16 +108,18 @@ type
     function rawupdates(ctxt: THttpServerRequest): cardinal;
     function rawupdates(ctxt: THttpServerRequest): cardinal;
   end;
   end;
 
 
+{$I-}
+
 const
 const
   HELLO_WORLD: RawUtf8 = 'Hello, World!';
   HELLO_WORLD: RawUtf8 = 'Hello, World!';
   TEXT_CONTENT_TYPE_NO_ENCODING: RawUtf8 = 'text/plain';
   TEXT_CONTENT_TYPE_NO_ENCODING: RawUtf8 = 'text/plain';
 
 
-  WORLD_COUNT      = 10000;
-  WORLD_READ_SQL   = 'select id,randomNumber from World where id=?';
+  WORLD_COUNT       = 10000;
+  WORLD_READ_SQL    = 'select id,randomNumber from World where id=?';
   WORLD_UPDATE_SQLN ='update World as t set randomNumber = v.r from ' +
   WORLD_UPDATE_SQLN ='update World as t set randomNumber = v.r from ' +
     '(SELECT unnest(?::bigint[]), unnest(?::bigint[]) order by 1) as v(id, r)' +
     '(SELECT unnest(?::bigint[]), unnest(?::bigint[]) order by 1) as v(id, r)' +
     ' where t.id = v.id';
     ' where t.id = v.id';
-  FORTUNES_SQL     = 'select id,message from Fortune';
+  FORTUNES_SQL      = 'select id,message from Fortune';
 
 
   FORTUNES_MESSAGE = 'Additional fortune added at request time.';
   FORTUNES_MESSAGE = 'Additional fortune added at request time.';
   FORTUNES_TPL     = '<!DOCTYPE html>' +
   FORTUNES_TPL     = '<!DOCTYPE html>' +
@@ -152,7 +155,7 @@ end;
 { TRawAsyncServer }
 { TRawAsyncServer }
 
 
 constructor TRawAsyncServer.Create(
 constructor TRawAsyncServer.Create(
-  threadCount: integer; flags: THttpServerOptions);
+  threadCount: integer; flags: THttpServerOptions; pin2Core: integer);
 begin
 begin
   inherited Create;
   inherited Create;
   fDbPool := TSqlDBPostgresConnectionProperties.Create(
   fDbPool := TSqlDBPostgresConnectionProperties.Create(
@@ -192,6 +195,8 @@ begin
      {$endif WITH_LOGS}
      {$endif WITH_LOGS}
      hsoIncludeDateHeader  // required by TPW General Test Requirements #5
      hsoIncludeDateHeader  // required by TPW General Test Requirements #5
     ] + flags);
     ] + flags);
+  if pin2Core <> -1 then
+    fHttpServer.Async.SetCpuAffinity(pin2Core);
   fHttpServer.HttpQueueLength := 10000; // needed e.g. from wrk/ab benchmarks
   fHttpServer.HttpQueueLength := 10000; // needed e.g. from wrk/ab benchmarks
   fHttpServer.ServerName := 'M';
   fHttpServer.ServerName := 'M';
   // use default routing using RTTI on the TRawAsyncServer published methods
   // use default routing using RTTI on the TRawAsyncServer published methods
@@ -234,7 +239,7 @@ begin
     pStmt.SendPipelinePrepared;
     pStmt.SendPipelinePrepared;
     pConn.PipelineSync;
     pConn.PipelineSync;
   end;
   end;
-  pConn.Flush; // we use modified libpq what not flush inside PQPipelineSync - flush manually
+  pConn.Flush; // in case we use modified libpq what not flush inside PQPipelineSync - flush manually
   for i := 0 to cnt - 1 do
   for i := 0 to cnt - 1 do
   begin
   begin
     pStmt.GetPipelineResult;
     pStmt.GetPipelineResult;
@@ -444,7 +449,7 @@ begin
     // update table set .. from values (), (), ... where id = id
     // update table set .. from values (), (), ... where id = id
     // we won't include it in the ORM but only for our RAW results
     // we won't include it in the ORM but only for our RAW results
     LastComputeUpdateSqlCnt := cnt;
     LastComputeUpdateSqlCnt := cnt;
-    W := TTextWriter.CreateOwnedStream(tmp);
+    W := TTextWriter.CreateOwnedStream(tmp{%H-});
     try
     try
       W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
       W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
       for i := 1 to cnt do begin
       for i := 1 to cnt do begin
@@ -511,55 +516,90 @@ end;
 
 
 var
 var
   rawServers: array of TRawAsyncServer;
   rawServers: array of TRawAsyncServer;
-  threads, servers, i: integer;
+  threads, servers, i, k, cpuIdx: integer;
+  pinServers2Cores: boolean;
+  cpuMask: TCpuSet;
+  accessibleCPUCount: PtrInt;
   flags: THttpServerOptions;
   flags: THttpServerOptions;
 
 
-procedure ComputeExecutionContextFromParams;
+function FindCmdLineSwitchVal(const Switch: string; out Value: string): Boolean;
+var
+  I, L: integer;
+  S, T: string;
+begin
+  Result := False;
+  S := Switch;
+  Value := '';
+  S := UpperCase(S);
+  I := ParamCount;
+  while (Not Result) and (I>0) do
+  begin
+    L := Length(Paramstr(I));
+    if (L>0) and (ParamStr(I)[1] in SwitchChars) then
+    begin
+      T := Copy(ParamStr(I),2,L-1);
+      T := UpperCase(T);
+      Result := S=T;
+      if Result and (I <> ParamCount) then
+        Value := ParamStr(I+1)
+    end;
+    Dec(i);
+  end;
+end;
+
+procedure ComputeExecutionContextFromParams(cpusAccessible: PtrInt);
 var
 var
-  cores: integer;
+  sw: string;
 begin
 begin
-  // user specified some values at command line: raw [threads] [cores] [servers]
-  // in practice, [cores] is just ignored
-  if not TryStrToInt(ParamStr(1), threads) then
-    threads := SystemInfo.dwNumberOfProcessors * 4;
-  if not TryStrToInt(ParamStr(2), cores) then
-    cores := 16;
-  if not TryStrToInt(ParamStr(3), servers) then
+  // user specified some values at command line: raw [-s serversCount] [-t threadsPerServer] [-p]
+  if not FindCmdLineSwitchVal('t', sw) or not TryStrToInt(sw, threads) then
+    threads := cpusAccessible * 4;
+  if not FindCmdLineSwitchVal('s', sw) or not TryStrToInt(sw, servers) then
     servers := 1;
     servers := 1;
-  if threads < 2 then
-    threads := 2
+  pinServers2Cores := FindCmdLineSwitch('p', true) or FindCmdLineSwitch('-pin', true);
+  if threads < 1 then
+    threads := 1
   else if threads > 256 then
   else if threads > 256 then
     threads := 256; // max. threads for THttpAsyncServer
     threads := 256; // max. threads for THttpAsyncServer
-  {if SystemInfo.dwNumberOfProcessors > cores then
-    SystemInfo.dwNumberOfProcessors := cores; // for hsoThreadCpuAffinity}
+
   if servers < 1 then
   if servers < 1 then
     servers := 1
     servers := 1
   else if servers > 256 then
   else if servers > 256 then
     servers := 256;
     servers := 256;
 end;
 end;
 
 
-procedure ComputeExecutionContextFromNumberOfProcessors;
-var
-  logicalcores: integer;
+procedure ComputeExecutionContextFromNumberOfProcessors(cpusAccessible: PtrInt);
 begin
 begin
   // automatically guess best parameters depending on available CPU cores
   // automatically guess best parameters depending on available CPU cores
-  logicalcores := SystemInfo.dwNumberOfProcessors;
-  if logicalcores >= 12 then
+  if cpusAccessible >= 6 then
   begin
   begin
-    // high-end CPU - scale using several listeners (one per core)
+    // scale using several listeners (one per core)
     // see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
     // see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
-    servers := logicalcores;
+    servers := cpusAccessible;
     threads := 8;
     threads := 8;
+    pinServers2Cores := true;
   end
   end
   else
   else
   begin
   begin
-    // regular CPU - a single instance and a few threads per core
+    // low-level CPU - a single instance and a few threads per core
     servers := 1;
     servers := 1;
-    threads := logicalcores * 4;
+    threads := cpusAccessible * 4;
+    pinServers2Cores := false;
   end;
   end;
 end;
 end;
 
 
 begin
 begin
+  if FindCmdLineSwitch('?') or FindCmdLineSwitch('h') or FindCmdLineSwitch('-help', ['-'], false) then
+  begin
+    writeln('Usage: ' + UTF8ToString(ExeVersion.ProgramName) + ' [-s serversCount] [-t threadsPerServer] [-p]');
+    writeln('Options:');
+    writeln('  -?, --help            displays this message');
+    writeln('  -s  serversCount      count of servers (listener sockets)');
+    writeln('  -t  threadsPerServer  per-server thread poll size');
+    writeln('  -p, --pin             pin each server to CPU starting from 0');
+    exit;
+  end;
+
   // setup logs
   // setup logs
   {$ifdef WITH_LOGS}
   {$ifdef WITH_LOGS}
   TSynLog.Family.Level := LOG_VERBOSE; // disable logs for benchmarking
   TSynLog.Family.Level := LOG_VERBOSE; // disable logs for benchmarking
@@ -577,35 +617,61 @@ begin
     TypeInfo(TFortune),    'id:integer message:RawUtf8']);
     TypeInfo(TFortune),    'id:integer message:RawUtf8']);
 
 
   // setup execution context
   // setup execution context
-  if ParamCount > 1 then
-    ComputeExecutionContextFromParams
+  accessibleCPUCount := CurrentCpuSet(cpuMask);
+
+  if ParamCount > 0 then
+    ComputeExecutionContextFromParams(accessibleCPUCount)
   else
   else
-    ComputeExecutionContextFromNumberOfProcessors;
+    ComputeExecutionContextFromNumberOfProcessors(accessibleCPUCount);
+  flags := [];
   if servers > 1 then
   if servers > 1 then
     include(flags, hsoReusePort); // allow several bindings on the same port
     include(flags, hsoReusePort); // allow several bindings on the same port
 
 
   // start the server instance(s), in hsoReusePort mode if needed
   // start the server instance(s), in hsoReusePort mode if needed
-  SetLength(rawServers, servers);
-  for i := 0 to servers - 1 do
-    rawServers[i] := TRawAsyncServer.Create(threads, flags);
+  SetLength(rawServers{%H-}, servers);
+  cpuIdx := -1; // do not pin to CPU by default
+  for i := 0 to servers - 1 do begin
+    if pinServers2Cores then
+    begin
+      k := i mod accessibleCPUCount;
+      cpuIdx := -1;
+      // find real CPU index according to the cpuMask
+      repeat
+        inc(cpuIdx);
+        if GetBit(cpuMask, cpuIdx) then
+          dec(k);
+      until k = -1;
+      writeln('Pin server #', i, ' to #', cpuIdx, ' CPU');
+    end;
+    rawServers[i] := TRawAsyncServer.Create(threads, flags, cpuIdx)
+  end;
+
   try
   try
     // display some information and wait for SIGTERM
     // display some information and wait for SIGTERM
-    {$I-}
     writeln;
     writeln;
     writeln(rawServers[0].fHttpServer.ClassName,
     writeln(rawServers[0].fHttpServer.ClassName,
      ' running on localhost:', rawServers[0].fHttpServer.SockPort);
      ' running on localhost:', rawServers[0].fHttpServer.SockPort);
     writeln(' num thread=', threads,
     writeln(' num thread=', threads,
-            ', num CPU=', SystemInfo.dwNumberOfProcessors,
+            ', total CPU=', SystemInfo.dwNumberOfProcessors,
+            ', accessible CPU=', accessibleCPUCount,
             ', num servers=', servers,
             ', num servers=', servers,
+            ', pinned=', pinServers2Cores,
             ', total workers=', threads * servers,
             ', total workers=', threads * servers,
             ', db=', rawServers[0].fDbPool.DbmsEngineName);
             ', db=', rawServers[0].fDbPool.DbmsEngineName);
     writeln(' options=', GetSetName(TypeInfo(THttpServerOptions), flags));
     writeln(' options=', GetSetName(TypeInfo(THttpServerOptions), flags));
     writeln('Press [Enter] or Ctrl+C or send SIGTERM to terminate'#10);
     writeln('Press [Enter] or Ctrl+C or send SIGTERM to terminate'#10);
     ConsoleWaitForEnterKey;
     ConsoleWaitForEnterKey;
     //TSynLog.Family.Level := LOG_VERBOSE; // enable shutdown logs for debug
     //TSynLog.Family.Level := LOG_VERBOSE; // enable shutdown logs for debug
-    for i := 0 to servers - 1 do
+    if servers = 1 then
       writeln(ObjectToJsonDebug(rawServers[i].fHttpServer,
       writeln(ObjectToJsonDebug(rawServers[i].fHttpServer,
-        [woDontStoreVoid, woHumanReadable]));
+        [woDontStoreVoid, woHumanReadable]))
+    else
+    begin
+      writeln('Per-server accepted connections:');
+      for i := 0 to servers - 1 do
+        write(rawServers[i].fHttpServer.Async.Accepted, ' ');
+      writeln;
+    end;
   finally
   finally
     // clear all server instance(s)
     // clear all server instance(s)
     ObjArrayClear(rawServers);
     ObjArrayClear(rawServers);
@@ -614,4 +680,3 @@ begin
   WriteHeapStatus(' ', 16, 8, {compileflags=}true);
   WriteHeapStatus(' ', 16, 8, {compileflags=}true);
   {$endif FPC_X64MM}
   {$endif FPC_X64MM}
 end.
 end.
-