Browse Source

* Working thread pool

Michaël Van Canneyt 4 years ago
parent
commit
ab7de66471
1 changed files with 162 additions and 76 deletions
  1. 162 76
      packages/fcl-web/src/base/fphttpserver.pp

+ 162 - 76
packages/fcl-web/src/base/fphttpserver.pp

@@ -62,10 +62,11 @@ Type
 
   TFPHTTPConnection = Class(TObject)
   private
+    FBusy: Boolean;
     FOnError: TRequestErrorHandler;
     FServer: TFPCustomHTTPServer;
     FSocket: TSocketStream;
-    FSetupSocket : Boolean;
+    FIsSocketSetup : Boolean;
     FBuffer : Ansistring;
     FKeepAliveEnabled : Boolean;
     FKeepAlive : Boolean;
@@ -78,12 +79,19 @@ Type
     procedure UnknownHeader(ARequest: TFPHTTPConnectionRequest; const AHeader: String); virtual;
     procedure HandleRequestError(E : Exception); virtual;
     Procedure SetupSocket; virtual;
+    Procedure SetBusy;
+    procedure DoHandleRequest; virtual;
     Function ReadRequestHeaders : TFPHTTPConnectionRequest;
+    // Check if we have keep-alive and no errors occured
+    Function AllowNewRequest : Boolean;
+    // Check if there is a new request pending, i.e. there is data.
     Function RequestPending : Boolean;
+    // True if we're handling a request. Needed to be able to schedule properly.
+    Property Busy : Boolean Read FBusy;
   Public
     Constructor Create(AServer : TFPCustomHTTPServer; ASocket : TSocketStream);
     Destructor Destroy; override;
-    Procedure HandleRequest; virtual;
+    Procedure HandleRequest;
     Property Socket : TSocketStream Read FSocket;
     Property Server : TFPCustomHTTPServer Read FServer;
     Property OnRequestError : TRequestErrorHandler Read FOnError Write FOnError;
@@ -135,17 +143,20 @@ Type
   end;
 
   TConnectionList = Class(TThreadList)
-  Public
-    procedure CloseSockets; virtual;
   end;
 
   { TFPHTTPServerConnectionListHandler }
 
   TFPHTTPServerConnectionListHandler = Class(TFPHTTPServerConnectionHandler)
   Private
+
     FList: TConnectionList;
   Protected
+    Type
+      TConnectionIterator = Procedure (aConnection :TFPHTTPConnection; var aContinue : boolean) of object;
     Function CreateList : TConnectionList;
+    Procedure CloseConnectionSocket(aConnection :TFPHTTPConnection; var aContinue : boolean);
+    Procedure Foreach(aIterator : TConnectionIterator);
     Procedure RemoveConnection(aConnection :TFPHTTPConnection); override;
   Public
     Constructor Create(aServer : TFPCustomHTTPServer); override;
@@ -191,15 +202,21 @@ Type
        { THandleRequestTask }
 
        THandleRequestTask = Class(TThreadPoolTask)
-         Constructor Create(aConnection : TFPHTTPConnection);
        private
          FConnection: TFPHTTPConnection;
+         FOnDone: TNotifyEvent;
        Protected
          procedure DoExecute; override;
        Public
+         Constructor Create(aConnection : TFPHTTPConnection; aOnConnectionDone : TNotifyEvent);
          Property Connection : TFPHTTPConnection Read FConnection;
+         Property OnDone : TNotifyEvent Read FOnDone;
        end;
+    procedure ConnectionDone(Sender: TObject); virtual;
+    procedure ScheduleRequest(aConnection: TFPHTTPConnection);virtual;
+    procedure CheckRequest(aConnection: TFPHTTPConnection; var aContinue : Boolean);virtual;
   Public
+    Procedure CloseSockets; override;
     procedure CheckRequests; override;
     Constructor Create(aServer : TFPCustomHttpServer); override;
     Procedure HandleConnection(aConnection : TFPHTTPConnection); override;
@@ -396,21 +413,33 @@ end;
 
 { TFPPooledConnectionHandler.THandleRequestTask }
 
-constructor TFPPooledConnectionHandler.THandleRequestTask.Create(aConnection: TFPHTTPConnection);
+constructor TFPPooledConnectionHandler.THandleRequestTask.Create(aConnection: TFPHTTPConnection; aOnConnectionDone : TNotifyEvent);
 begin
+  FOnDone:=aOnConnectionDone;
   FConnection:=aConnection;
 end;
 
 procedure TFPPooledConnectionHandler.THandleRequestTask.DoExecute;
 begin
-  Connection.HandleRequest;
+  Try
+    Connection.HandleRequest;
+    if Assigned(FOnDone) then
+      FOnDone(Connection);
+  except
+    On E : Exception do
+      TFPCustomHttpServer.HandleUnexpectedError(E);
+  end;
 end;
 
 { TFPPooledConnectionHandler }
 
 procedure TFPPooledConnectionHandler.CheckRequests;
-begin
 
+begin
+  // First schedule what is already there..
+  FPool.CheckQueuedTasks;
+  // Now maybe add
+  Foreach(@CheckRequest);
 end;
 
 constructor TFPPooledConnectionHandler.Create(aServer: TFPCustomHttpServer);
@@ -419,14 +448,53 @@ begin
   FPool:=CreatePool;
 end;
 
+procedure TFPPooledConnectionHandler.ScheduleRequest(aConnection: TFPHTTPConnection);
+
+begin
+  // So we don't schedule it again while it is waiting to be handled.
+  aConnection.SetBusy;
+  FPool.AddTask(THandleRequestTask.Create(aConnection,@ConnectionDone));
+end;
+
+procedure TFPPooledConnectionHandler.CheckRequest(aConnection: TFPHTTPConnection; var aContinue: Boolean);
+begin
+  if Server.Active and aConnection.AllowNewRequest and aConnection.RequestPending then
+    ScheduleRequest(aConnection);
+end;
+
+procedure TFPPooledConnectionHandler.CloseSockets;
+begin
+  FPool.CancelQueuedTasks;
+  inherited CloseSockets;
+end;
+
+procedure TFPPooledConnectionHandler.ConnectionDone(Sender: TObject);
+
+var
+  aConn : TFPHTTPConnection;
+
+begin
+  aConn:=Sender as TFPHTTPConnection;
+  if Not aConn.AllowNewRequest then
+    RemoveConnection(aConn);
+end;
+
 procedure TFPPooledConnectionHandler.HandleConnection(aConnection: TFPHTTPConnection);
 begin
-  FPool.AddTask(THandleRequestTask.Create(aConnection));
+  Inherited;
+  ScheduleRequest(aConnection);
 end;
 
 function TFPPooledConnectionHandler.CreatePool: TFPCustomSimpleThreadPool;
+
+Var
+  P : TFPSimpleThreadPool;
+
 begin
-  Result:=TFPSimpleThreadPool.Create;
+  P:=TFPSimpleThreadPool.Create;
+  P.AddWaitInterval:=10;
+  P.AddTimeout:=30;
+  Result:=P;
 end;
 
 { TFPHTTPServerConnectionListHandler }
@@ -436,6 +504,29 @@ begin
   Result:=TConnectionList.Create;
 end;
 
+procedure TFPHTTPServerConnectionListHandler.CloseConnectionSocket(aConnection: TFPHTTPConnection; var aContinue: boolean);
+begin
+  sockets.CloseSocket(aConnection.Socket.Handle);
+end;
+
+procedure TFPHTTPServerConnectionListHandler.Foreach(aIterator: TConnectionIterator);
+Var
+  L : TList;
+  aContinue : Boolean;
+  I : Integer;
+
+begin
+  aContinue:=True;
+  L:=FList.LockList;
+  try
+    For I:=L.Count-1 downto 0 do
+      if aContinue then
+        aIterator(TFPHTTPConnection(L[i]),aContinue);
+  finally
+    FList.UnlockList;
+  end;
+end;
+
 procedure TFPHTTPServerConnectionListHandler.RemoveConnection(aConnection: TFPHTTPConnection);
 begin
   Flist.Remove(aConnection);
@@ -455,7 +546,7 @@ end;
 
 procedure TFPHTTPServerConnectionListHandler.CloseSockets;
 begin
-  FList.CloseSockets;
+  Foreach(@CloseConnectionSocket);
 end;
 
 function TFPHTTPServerConnectionListHandler.GetActiveConnectionCount: Integer;
@@ -521,27 +612,10 @@ end;
 
 procedure TFPSimpleConnectionHandler.CloseSockets;
 begin
-  if Assigned(FCOnnection) then
+  if Assigned(FConnection) then
     sockets.CloseSocket(FConnection.Socket.Handle);
 end;
 
-{ TConnectionList }
-
-procedure TConnectionList.CloseSockets;
-
-Var
-  L : TList;
-  I : Integer;
-
-begin
-  L:=LockList;
-  try
-    for I:= L.Count-1 downto 0 do
-      sockets.CloseSocket(TFPHTTPConnection(L[I]).Socket.Handle);
-  finally
-    UnlockList;
-  end;
-end;
 
 { TFPHTTPServerConnectionHandler }
 
@@ -711,6 +785,12 @@ begin
   FSocket.ReadFlags:=MSG_NOSIGNAL;
   FSocket.WriteFlags:=MSG_NOSIGNAL;
 {$endif}
+  FIsSocketSetup:=True;
+end;
+
+procedure TFPHTTPConnection.SetBusy;
+begin
+  FBusy:=True;
 end;
 
 Procedure TFPHTTPConnection.InterPretHeader(ARequest : TFPHTTPConnectionRequest; Const AHeader : String);
@@ -833,6 +913,11 @@ begin
   end;
 end;
 
+function TFPHTTPConnection.AllowNewRequest: Boolean;
+begin
+  Result:=not Busy and KeepAliveEnabled and KeepAlive and (Socket.LastError=0);
+end;
+
 function TFPHTTPConnection.RequestPending: Boolean;
 begin
   Result:=Socket.CanRead(KeepAliveTimeout);
@@ -840,8 +925,8 @@ end;
 
 constructor TFPHTTPConnection.Create(AServer: TFPCustomHttpServer; ASocket: TSocketStream);
 begin
+  FIsSocketSetup:=False;
   FSocket:=ASocket;
-  FSetupSocket:=True;
   FServer:=AServer;
   KeepAliveTimeout:=DefaultKeepaliveTimeout;
 end;
@@ -861,54 +946,59 @@ begin
     Result:=False;  
 end;
 
-procedure TFPHTTPConnection.HandleRequest;
+procedure TFPHTTPConnection.DoHandleRequest;
 
 Var
   Req : TFPHTTPConnectionRequest;
   Resp : TFPHTTPConnectionResponse;
 
 begin
-  Try
-    if FSetupSocket then
+  // Read headers.
+  Resp:=Nil;
+  Req:=ReadRequestHeaders;
+  try
+    //set port
+    Req.ServerPort := Server.Port;
+    // Read content, if any
+    If Req.ContentLength>0 then
+      ReadRequestContent(Req);
+    Req.InitRequestVars;
+    if KeepAliveEnabled then
       begin
-      SetupSocket;
-      FSetupSocket:=False;
+      // Read out keep-alive
+      FKeepAlive:=Req.HttpVersion='1.1'; // keep-alive is default on HTTP 1.1
+      if SameText(Req.GetHeader(hhConnection),'close') then
+        FKeepAlive:=False
+      else if SameText(Req.GetHeader(hhConnection),'keep-alive') then
+        FKeepAlive:=True;
       end;
-    // Read headers.
-    Resp:=Nil;
-    Req:=ReadRequestHeaders;
-    try
-      //set port
-      Req.ServerPort := Server.Port;
-      // Read content, if any
-      If Req.ContentLength>0 then
-        ReadRequestContent(Req);
-      Req.InitRequestVars;
-      if KeepAliveEnabled then
-        begin
-        // Read out keep-alive
-        FKeepAlive:=Req.HttpVersion='1.1'; // keep-alive is default on HTTP 1.1
-        if SameText(Req.GetHeader(hhConnection),'close') then
-          FKeepAlive:=False
-        else if SameText(Req.GetHeader(hhConnection),'keep-alive') then
-          FKeepAlive:=True;
-        end;
-      // Create Response
-      Resp:= Server.CreateResponse(Req);
-      Server.InitResponse(Resp);
-      // We set the header here now. User can override it when needed.
-      if FKeepAlive and (Req.HttpVersion='1.0') and not Resp.HeaderIsSet(hhConnection) then
-        Resp.SetHeader(hhConnection,'keep-alive');
-      Resp.FConnection:=Self;
-      // And dispatch
-      if Server.Active then
-        Server.HandleRequest(Req,Resp);
-      if Assigned(Resp) and (not Resp.ContentSent) then
-        Resp.SendContent;
-    Finally
-      FreeAndNil(Resp);
-      FreeAndNil(Req);
-    end;
+    // Create Response
+    Resp:= Server.CreateResponse(Req);
+    Server.InitResponse(Resp);
+    // We set the header here now. User can override it when needed.
+    if FKeepAlive and (Req.HttpVersion='1.0') and not Resp.HeaderIsSet(hhConnection) then
+      Resp.SetHeader(hhConnection,'keep-alive');
+    Resp.FConnection:=Self;
+    // And dispatch
+    if Server.Active then
+      Server.HandleRequest(Req,Resp);
+    if Assigned(Resp) and (not Resp.ContentSent) then
+      Resp.SendContent;
+  Finally
+    FreeAndNil(Resp);
+    FreeAndNil(Req);
+  end;
+end;
+
+procedure TFPHTTPConnection.HandleRequest;
+
+
+begin
+  FBusy:=True;
+  Try
+    if not FIsSocketSetup then
+      SetupSocket;
+    DoHandleRequest;
   Except
     On E : Exception do
       begin
@@ -916,6 +1006,7 @@ begin
       HandleRequestError(E);
       end;
   end;
+  FBusy:=False;
 end;
 
 { TFPHTTPConnectionThread }
@@ -931,16 +1022,11 @@ end;
 
 procedure TFPHTTPConnectionThread.Execute;
 
-  Function AllowReading : Boolean; // inline;
-  begin
-    Result:=not Terminated and Connection.KeepAliveEnabled and Connection.KeepAlive
-  end;
-
 begin
   try
     // Always handle first request
     Connection.HandleRequest;
-    While AllowReading and (FConnection.Socket.LastError=0) do
+    While not Terminated and Connection.AllowNewRequest do
       if Connection.RequestPending then
         Connection.HandleRequest;
   except