Forráskód Böngészése

* Allow server to run message queue in a thread

git-svn-id: trunk@33705 -
michael 9 éve
szülő
commit
365a246c9a

+ 1 - 0
packages/fcl-process/examples/checkipcserver.lpi

@@ -40,6 +40,7 @@
     </Target>
     <SearchPaths>
       <IncludeFiles Value="$(ProjOutDir)"/>
+      <OtherUnitFiles Value="../units/$(TargetCPU)-$(TargetOS)"/>
       <UnitOutputDirectory Value="lib/$(TargetCPU)-$(TargetOS)"/>
     </SearchPaths>
   </CompilerOptions>

+ 13 - 4
packages/fcl-process/examples/ipcclient.pp

@@ -6,6 +6,7 @@ uses sysutils,simpleipc;
 
 Var
   I,Count : Integer;
+  DoStop : Boolean;
 
 begin
   Count:=1;
@@ -13,11 +14,19 @@ begin
     try
       ServerID:='ipcserver';
       If (ParamCount>0) then
-        ServerInstance:=Paramstr(1);
-      if ParamCount>1 then
-        Count:=StrToIntDef(ParamStr(2),1);  
+        begin
+        DoStop:=(ParamStr(1)='-s') or (paramstr(1)='--stop');
+        if DoStop then
+          ServerInstance:=Paramstr(2)
+        else  
+          ServerInstance:=Paramstr(1);
+        if (Not DoStop) and (ParamCount>1) then
+          Count:=StrToIntDef(ParamStr(2),1);  
+        end;  
       Active:=True;
-      for I:=1 to Count do
+      if DoStop then
+        SendStringMessage('stop')
+      else  for I:=1 to Count do
         SendStringMessage(Format('Testmessage %d from client',[i]));
       Active:=False;
     finally

+ 3 - 1
packages/fcl-process/examples/ipcserver.lpi

@@ -6,7 +6,6 @@
       <Flags>
         <MainUnitHasCreateFormStatements Value="False"/>
         <MainUnitHasTitleStatement Value="False"/>
-        <UseDefaultCompilerOptions Value="True"/>
       </Flags>
       <SessionStorage Value="InProjectDir"/>
       <MainUnit Value="0"/>
@@ -29,6 +28,7 @@
     <RunParams>
       <local>
         <FormatVersion Value="1"/>
+        <CommandLineParams Value="-t"/>
       </local>
     </RunParams>
     <Units Count="1">
@@ -44,6 +44,8 @@
       <Filename Value="ipcserver"/>
     </Target>
     <SearchPaths>
+      <IncludeFiles Value="$(ProjOutDir)"/>
+      <OtherUnitFiles Value="../units/$(TargetCPU)-$(TargetOS)"/>
       <UnitOutputDirectory Value="lib/$(TargetCPU)-$(TargetOS)"/>
     </SearchPaths>
   </CompilerOptions>

+ 55 - 7
packages/fcl-process/examples/ipcserver.pp

@@ -5,31 +5,79 @@ program ipcserver;
 {$APPTYPE CONSOLE}
 
 uses
+  {$ifdef unix}cthreads,{$endif}
   SysUtils,
+  Classes,
   simpleipc;
 
+Type
+  TApp = Class(TObject)  
+    Srv : TSimpleIPCServer;
+    DoStop : Boolean;
+    Procedure MessageQueued(Sender : TObject);
+    procedure Run;
+    Procedure PrintMessage;
+  end;
+
+Procedure TApp.PrintMessage;
+
+Var
+  S : String;
+ 
+begin
+  S:=Srv.StringMessage;
+  Writeln('Received message : ',S);
+  DoStop:=DoStop or (S='stop');
+end;
+
+Procedure TApp.MessageQueued(Sender : TObject);
+
+begin
+  Srv.ReadMessage;
+  PrintMessage;
+end;
+
+
+Procedure TApp.Run;
+  
 Var
-  Srv : TSimpleIPCServer;
   S : String;
+  Threaded : Boolean;
 
 begin
   Srv:=TSimpleIPCServer.Create(Nil);
   Try
+    S:= ParamStr(1);
+    Threaded:=(S='-t') or (S='--threaded');
     Srv.ServerID:='ipcserver';
     Srv.Global:=True;
-    Srv.StartServer;
-    Writeln('Server started. Listening for messages');
+    if Threaded then
+      Srv.OnMessageQueued:=@MessageQueued;
+    Srv.StartServer(Threaded);
+    
+    Writeln('Server started. Listening for messages. Send "stop" message to stop server.');
     Repeat
-      If Srv.PeekMessage(1,True) then
+      If Threaded then
         begin
-        S:=Srv.StringMessage;
-        Writeln('Received message : ',S);
+        Sleep(10);
+        CheckSynchronize;
         end
+      else if Srv.PeekMessage(10,True) then
+        PrintMessage
       else
         Sleep(10);
-    Until CompareText(S,'stop')=0;
+    Until DoStop;
   Finally
     Srv.Free;
   end;
+end;
+
+begin
+  With TApp.Create do
+    try
+      Run
+    finally
+      Free;
+    end;    
 end.
 

+ 150 - 7
packages/fcl-process/src/simpleipc.pp

@@ -24,7 +24,8 @@ uses
 
 Const
   MsgVersion = 1;
-  
+  DefaultThreadTimeOut = 50;
+
   //Message types
   mtUnknown = 0;
   mtString = 1;
@@ -33,7 +34,6 @@ type
   TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
 
 var
-  // Currently implemented only for Windows platform!
   DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
   DefaultIPCMessageQueueLimit: Integer = 0;
 
@@ -125,32 +125,46 @@ Type
 
   { TSimpleIPCServer }
 
+  TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
 
   TSimpleIPCServer = Class(TSimpleIPC)
   protected
   Private
+    FOnMessageError: TMessageQueueEvent;
+    FOnMessageQueued: TNotifyEvent;
     FQueue : TIPCServerMsgQueue;
     FGlobal: Boolean;
     FOnMessage: TNotifyEvent;
     FMsgType: TMessageType;
     FMsgData : TStream;
+    FThreadTimeOut: Integer;
+    FThread : TThread;
+    FLock : TRTLCriticalSection;
+    FErrMsg : TIPCServerMsg;
+    procedure DoMessageQueued;
+    procedure DoMessageError;
     function GetInstanceID: String;
     function GetMaxAction: TIPCMessageOverflowAction;
+    function GetMaxQueue: Integer;
     function GetStringMessage: String;
     procedure SetGlobal(const AValue: Boolean);
     procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
+    procedure SetMaxQueue(AValue: Integer);
   Protected
     FIPCComm: TIPCServerComm;
+    procedure StartThread; virtual;
+    procedure StopThread; virtual;
     Function CommClass : TIPCServerCommClass; virtual;
     Procedure PushMessage(Msg : TIPCServerMsg); virtual;
     function PopMessage: Boolean; virtual;
     Procedure Activate; override;
     Procedure Deactivate; override;
     Property Queue : TIPCServerMsgQueue Read FQueue;
+    Property Thread : TThread Read FThread;
   Public
     Constructor Create(AOwner : TComponent); override;
     Destructor Destroy; override;
-    Procedure StartServer;
+    Procedure StartServer(Threaded : Boolean = False);
     Procedure StopServer;
     Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
     Procedure ReadMessage;
@@ -160,8 +174,17 @@ Type
     Property  MsgData : TStream Read FMsgData;
     Property  InstanceID : String Read GetInstanceID;
   Published
+    Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
     Property Global : Boolean Read FGlobal Write SetGlobal;
+    // Called during ReadMessage
     Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
+    // Called when a message is pushed on the queue.
+    Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
+    // Called when the queue overflows and  MaxAction = ipcmoaError.
+    Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
+    // Maximum number of messages to keep in the queue
+    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
+    // What to do when the queue overflows
     property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
   end;
 
@@ -464,12 +487,13 @@ begin
   FBusy:=False;
   FMsgData:=TStringStream.Create('');
   FQueue:=TIPCServerMsgQueue.Create;
+  FThreadTimeOut:=DefaultThreadTimeOut;
 end;
 
 destructor TSimpleIPCServer.Destroy;
 begin
-  FreeAndNil(FQueue);
   Active:=False;
+  FreeAndNil(FQueue);
   FreeAndNil(FMsgData);
   inherited Destroy;
 end;
@@ -488,6 +512,11 @@ begin
   FQueue.MaxAction:=AValue;
 end;
 
+procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
+begin
+  FQueue.MaxCount:=AValue;
+end;
+
 function TSimpleIPCServer.GetInstanceID: String;
 begin
   Result:=FIPCComm.InstanceID;
@@ -498,6 +527,11 @@ begin
   Result:=FQueue.MaxAction;
 end;
 
+function TSimpleIPCServer.GetMaxQueue: Integer;
+begin
+  Result:=FQueue.MaxCount;
+end;
+
 
 function TSimpleIPCServer.GetStringMessage: String;
 begin
@@ -505,7 +539,7 @@ begin
 end;
 
 
-procedure TSimpleIPCServer.StartServer;
+procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
 begin
   if Not Assigned(FIPCComm) then
     begin
@@ -515,10 +549,62 @@ begin
     FIPCComm.StartServer;
     end;
   FActive:=True;
+  If Threaded then
+    StartThread;
+end;
+
+Type
+
+  { TServerThread }
+
+  TServerThread = Class(TThread)
+  private
+    FServer: TSimpleIPCServer;
+    FThreadTimeout: Integer;
+  Public
+    Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
+    procedure Execute; override;
+    Property Server : TSimpleIPCServer Read FServer;
+    Property ThreadTimeout : Integer Read FThreadTimeout;
+  end;
+
+{ TServerThread }
+
+constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
+begin
+  FServer:=AServer;
+  FThreadTimeout:=ATimeOut;
+  Inherited Create(False);
+end;
+
+procedure TServerThread.Execute;
+begin
+  While Not Terminated do
+    FServer.PeekMessage(ThreadTimeout,False);
+end;
+
+procedure TSimpleIPCServer.StartThread;
+
+begin
+  InitCriticalSection(FLock);
+  FThread:=TServerThread.Create(Self,ThreadTimeOut);
+end;
+
+procedure TSimpleIPCServer.StopThread;
+
+begin
+  if Assigned(FThread) then
+    begin
+    FThread.Terminate;
+    FThread.WaitFor;
+    FreeAndNil(FThread);
+    DoneCriticalSection(FLock);
+    end;
 end;
 
 procedure TSimpleIPCServer.StopServer;
 begin
+  StopThread;
   If Assigned(FIPCComm) then
     begin
     FIPCComm.StopServer;
@@ -529,7 +615,7 @@ begin
 end;
 
 // TimeOut values:
-//   >  0  -- umber of milliseconds to wait
+//   >  0  -- Number of milliseconds to wait
 //   =  0  -- return immediately
 //   = -1  -- wait infinitely
 //   < -1  -- wait infinitely (force to -1)
@@ -557,9 +643,17 @@ function TSimpleIPCServer.PopMessage: Boolean;
 
 var
   MsgItem: TIPCServerMsg;
+  DoLock : Boolean;
 
 begin
-  MsgItem:=FQueue.Pop;
+  DoLock:=Assigned(FThread);
+  if DoLock then
+    EnterCriticalsection(Flock);
+  try
+    MsgItem:=FQueue.Pop;
+  finally
+    LeaveCriticalsection(FLock);
+  end;
   Result:=Assigned(MsgItem);
   if Result then
     try
@@ -605,6 +699,55 @@ begin
 end;
 
 
+procedure TSimpleIPCServer.DoMessageQueued;
+
+begin
+  if Assigned(FOnMessageQueued) then
+    FOnMessageQueued(Self);
+end;
+
+procedure TSimpleIPCServer.DoMessageError;
+begin
+  try
+    if Assigned(FOnMessageQueued) then
+      FOnMessageError(Self,FErrMsg);
+  finally
+    FreeAndNil(FErrMsg)
+  end;
+end;
+
+procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
+
+Var
+  DoLock : Boolean;
+
+begin
+  try
+    DoLock:=Assigned(FThread);
+    If DoLock then
+      EnterCriticalsection(FLock);
+    try
+      Queue.Push(Msg);
+    finally
+      If DoLock then
+        LeaveCriticalsection(FLock);
+    end;
+    if DoLock then
+      TThread.Synchronize(FThread,@DoMessageQueued)
+    else
+      DoMessageQueued;
+  except
+    On E : Exception do
+      FErrMsg:=Msg;
+  end;
+  if Assigned(FErrMsg) then
+    if DoLock then
+      TThread.Synchronize(FThread,@DoMessageError)
+    else
+      DoMessageQueued;
+
+end;
+
 
 
 { ---------------------------------------------------------------------

+ 0 - 5
packages/fcl-process/src/unix/simpleipc.inc

@@ -229,11 +229,6 @@ begin
     Result:=TPipeServerComm;
 end;
 
-procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
-begin
-  Queue.Push(Msg);
-end;
-
 function TSimpleIPCClient.CommClass: TIPCClientCommClass;
 begin
   if (DefaultIPCClientClass<>Nil) then