Browse Source

* Applied patch from Ondrej to fix thread ID not being unique

git-svn-id: trunk@31955 -
michael 9 years ago
parent
commit
bfc85a73bd
1 changed files with 175 additions and 77 deletions
  1. 175 77
      packages/fcl-base/src/advancedipc.pp

+ 175 - 77
packages/fcl-base/src/advancedipc.pp

@@ -69,12 +69,13 @@ type
     function RequestFileNameToID(const aFileName: string): Integer;
     function RequestExists(const aRequestFileName: string): Boolean;
 
-    function GetUniqueRequest(out outFileName: string): Integer;
     procedure SetServerID(const aServerID: string); virtual;
     procedure SetGlobal(const aGlobal: Boolean); virtual;
 
     function CanReadMessage(const aFileName: string; out outStream: TStream; out outMsgType: TMessageType; out outMsgLen: Integer): Boolean;
-    procedure DoPostMessage(const aFileName: string; const aMsgType: TMessageType; const aStream: TStream);
+    procedure DoPostMessage(const aFileName: string; const aMsgType: TMessageType; const aStream: TStream); overload;
+    procedure DoPostMessage(const aFileStream: TFileStream; const aMsgType: TMessageType; const aStream: TStream); overload;
+    function DoReadMessage(const aFileName: string; const aStream: TStream; out outMsgType: TMessageType): Boolean;
 
     property FileName: string read FFileName;
   public
@@ -82,9 +83,9 @@ type
       const outServerIDs: TStrings; const aGlobal: Boolean = False);
     class function ServerRunning(const aServerID: string; const aGlobal: Boolean = False): Boolean; overload;
   public
-    //ServerID: name/ID of the server. Use only ['a'..'z', 'A'..'Z', '_'] characters
+    //ServerID: name/ID of the server. Use only ['a'..'z', 'A'..'Z', '0'..'9', '_'] characters
     property ServerID: string read FServerID write SetServerID;
-    //Global: if true, processes from different users can communicate; false, processes only from current users can communicate
+    //Global: if true, processes from different users can communicate; false, processes only from current user can communicate
     property Global: Boolean read FGlobal write SetGlobal;
     //MessageVersion: only messages with the same MessageVersion can be delivered between server/client
     property MessageVersion: Integer read FMessageVersion write FMessageVersion;
@@ -92,7 +93,12 @@ type
 
   TIPCClient = class(TIPCBase)
   private
-    FLastMsgFileName: string;
+    FLastRequestID: Integer;
+
+    function CreateUniqueRequest(out outFileStream: TFileStream): Integer;
+    function DoPeekResponse(const aResponseFileName: string; const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
+  public
+    constructor Create(aOwner: TComponent); override;
   public
     //post request to server, do not wait until request is peeked; returns request ID
     function PostRequest(const aMsgType: TMessageType; const aStream: TStream): Integer;
@@ -100,9 +106,15 @@ type
     function SendRequest(const aMsgType: TMessageType; const aStream: TStream; const aTimeOut: Integer): Boolean;
     function SendRequest(const aMsgType: TMessageType; const aStream: TStream; const aTimeOut: Integer; out outRequestID: Integer): Boolean;
     //peek a response from last request from this client
-    function PeekResponse(const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
-    //delete last request from this client
-    procedure DeleteRequest;
+    function PeekResponse(const aStream: TStream; out outMsgType: TMessageType): Boolean; overload;
+    function PeekResponse(const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
+    //peek a response from request by ID
+    function PeekResponse(const aRequestID: Integer; const aStream: TStream; out outMsgType: TMessageType): Boolean; overload;
+    function PeekResponse(const aRequestID: Integer; const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
+    //delete last request from this client, returns true if request file existed and was deleted
+    function DeleteRequest: Boolean; overload;
+    //delete request by ID, returns true if request existed file and was deleted
+    function DeleteRequest(const aRequestID: Integer): Boolean; overload;
     //check if server is running
     function ServerRunning: Boolean; overload;
   end;
@@ -131,8 +143,8 @@ type
     function PeekRequest(out outRequestID: Integer; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
     //read a peeked request (that hasn't been read yet)
     function ReadRequest(const aRequestID: Integer; const aStream: TStream): Boolean;
-    //delete a peeked request (that hasn't been read yet)
-    procedure DeleteRequest(const aRequestID: Integer);
+    //delete a peeked request (that hasn't been read yet), returns true if request file existed and was deleted
+    function DeleteRequest(const aRequestID: Integer): Boolean;
 
     //post response to a request
     procedure PostResponse(const aRequestID: Integer; const aMsgType: TMessageType; const aStream: TStream);
@@ -163,6 +175,9 @@ resourcestring
 
 implementation
 
+type
+  TIPCSearchRec = TRawByteSearchRec;
+
 const
   {$IFDEF UNIX}
   GLOBAL_RIGHTS = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH or S_IWOTH;
@@ -170,6 +185,9 @@ const
   GLOBAL_RIGHTS = 0;
   {$ENDIF}
 
+var
+  CreateUniqueRequestCritSec: TRTLCriticalSection;
+
 { TIPCBase }
 
 function TIPCBase.CanReadMessage(const aFileName: string; out
@@ -214,15 +232,27 @@ begin
   outMsgLen := xHeader.MsgLen;
 end;
 
-function TIPCBase.GetUniqueRequest(out outFileName: string): Integer;
+function TIPCBase.DoReadMessage(const aFileName: string;
+  const aStream: TStream; out outMsgType: TMessageType): Boolean;
+var
+  xStream: TStream;
+  xMsgLen: Integer;
 begin
-  Randomize;
-  repeat
-    //if Randomize/Random is started from 2 processes at exactly same moment, it returns the same number! -> prevent duplicates by xor GetCurrentThreadId
-    //the result must be of range 0..$7FFFFFFF (High(Integer))
-    Result := Integer((PtrInt(Random($7FFFFFFF)) xor PtrInt(GetCurrentThreadId)) and $7FFFFFFF);
-    outFileName := GetRequestFileName(Result);
-  until not RequestExists(outFileName);
+  aStream.Size := 0;
+  xStream := nil;
+  try
+    Result := CanReadMessage(aFileName, xStream, outMsgType, xMsgLen);
+    if Result then
+    begin
+      if xMsgLen > 0 then
+        aStream.CopyFrom(xStream, xMsgLen);
+      FreeAndNil(xStream);
+      aStream.Position := 0;
+      DeleteFile(aFileName);
+    end;
+  finally
+    xStream.Free;
+  end;
 end;
 
 function TIPCBase.RequestExists(const aRequestFileName: string): Boolean;
@@ -267,8 +297,20 @@ end;
 procedure TIPCBase.DoPostMessage(const aFileName: string;
   const aMsgType: TMessageType; const aStream: TStream);
 var
-  xHeader: TMessageHeader;
   xStream: TFileStream;
+begin
+  xStream := TFileStream.Create(aFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
+  try
+    DoPostMessage(xStream, aMsgType, aStream);
+  finally
+    xStream.Free;
+  end;
+end;
+
+procedure TIPCBase.DoPostMessage(const aFileStream: TFileStream;
+  const aMsgType: TMessageType; const aStream: TStream);
+var
+  xHeader: TMessageHeader;
 begin
   xHeader.HeaderVersion := HEADER_VERSION;
   xHeader.FileLock := 1;//locking
@@ -279,18 +321,14 @@ begin
     xHeader.MsgLen := 0;
   xHeader.MsgVersion := MessageVersion;
 
-  xStream := TFileStream.Create(aFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
-  try
-    xStream.WriteBuffer(xHeader, SizeOf(xHeader));
-    if Assigned(aStream) and (aStream.Size-aStream.Position > 0) then
-      xStream.CopyFrom(aStream, aStream.Size-aStream.Position);
+  aFileStream.WriteBuffer(xHeader, SizeOf(xHeader));
+  if Assigned(aStream) and (aStream.Size-aStream.Position > 0) then
+    aFileStream.CopyFrom(aStream, aStream.Size-aStream.Position);
 
-    xStream.Position := 0;//unlocking
-    xHeader.FileLock := 0;
-    xStream.WriteBuffer(xHeader, SizeOf(xHeader));
-  finally
-    xStream.Free;
-  end;
+  aFileStream.Position := 0;//unlocking
+  xHeader.FileLock := 0;
+  aFileStream.WriteBuffer(xHeader, SizeOf(xHeader));
+  aFileStream.Seek(0, soEnd);
 end;
 
 function TIPCBase.RequestFileNameToID(const aFileName: string): Integer;
@@ -306,7 +344,7 @@ end;
 class procedure TIPCBase.FindRunningServers(const aServerIDPrefix: string;
   const outServerIDs: TStrings; const aGlobal: Boolean);
 var
-  xRec: TRawByteSearchRec;
+  xRec: TIPCSearchRec;
 begin
   if FindFirst(ServerIDToFileName(aServerIDPrefix+AllFilesMask, aGlobal), faAnyFile, xRec) = 0 then
   begin
@@ -368,45 +406,107 @@ end;
 
 { TIPCClient }
 
-procedure TIPCClient.DeleteRequest;
+constructor TIPCClient.Create(aOwner: TComponent);
 begin
-  if DeleteFile(FLastMsgFileName) then
-    FLastMsgFileName := '';
+  inherited Create(aOwner);
+
+  FLastRequestID := -1;
 end;
 
-function TIPCClient.PeekResponse(const aStream: TStream; out
-  outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
+function TIPCClient.DeleteRequest(const aRequestID: Integer): Boolean;
+var
+  xRequestFileName: string;
+begin
+  xRequestFileName := GetRequestFileName(aRequestID);
+  Result := DeleteFile(xRequestFileName);
+  if (aRequestID = FLastRequestID) and not FileExists(xRequestFileName) then
+    FLastRequestID := -1;
+end;
+
+function TIPCClient.DeleteRequest: Boolean;
+begin
+  if FLastRequestID >= 0 then
+    Result := DeleteRequest(FLastRequestID)
+  else
+    Result := False;
+end;
+
+function TIPCClient.DoPeekResponse(const aResponseFileName: string;
+  const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer
+  ): Boolean;
 var
   xStart: QWord;
-  xStream: TStream;
-  xMsgLen: Integer;
-  xFileResponse: string;
 begin
   aStream.Size := 0;
   Result := False;
   xStart := GetTickCount64;
   repeat
-    xFileResponse := GetResponseFileName(FLastMsgFileName);
-    if CanReadMessage(xFileResponse, xStream, outMsgType, xMsgLen) then
-    begin
-      if xMsgLen > 0 then
-        aStream.CopyFrom(xStream, xMsgLen);
-      xStream.Free;
-      aStream.Position := 0;
-      DeleteFile(xFileResponse);
-      Exit(True);
-    end
+    if DoReadMessage(aResponseFileName, aStream, outMsgType) then
+      Exit(True)
     else if aTimeOut > 20 then
       Sleep(10);
   until (GetTickCount64-xStart > aTimeOut);
 end;
 
+function TIPCClient.CreateUniqueRequest(out outFileStream: TFileStream): Integer;
+var
+  xFileName: string;
+begin
+  outFileStream := nil;
+  EnterCriticalsection(CreateUniqueRequestCritSec);
+  try
+    Randomize;
+    repeat
+      //if Randomize/Random is started from 2 processes at exactly same moment, it returns the same number! -> prevent duplicates by xor GetProcessId
+      //the result must be of range 0..$7FFFFFFF (High(Integer))
+      Result := Integer((PtrInt(Random($7FFFFFFF)) xor PtrInt(GetProcessID)) and $7FFFFFFF);
+      xFileName := GetRequestFileName(Result);
+    until not RequestExists(xFileName);
+
+    outFileStream := TFileStream.Create(xFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
+  finally
+    LeaveCriticalsection(CreateUniqueRequestCritSec);
+  end;
+end;
+
+function TIPCClient.PeekResponse(const aRequestID: Integer;
+  const aStream: TStream; out outMsgType: TMessageType): Boolean;
+begin
+  Result := DoReadMessage(GetResponseFileName(aRequestID), aStream, outMsgType);
+end;
+
+function TIPCClient.PeekResponse(const aRequestID: Integer;
+  const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer
+  ): Boolean;
+begin
+  Result := DoPeekResponse(GetResponseFileName(aRequestID), aStream, outMsgType, aTimeOut);
+end;
+
+function TIPCClient.PeekResponse(const aStream: TStream; out
+  outMsgType: TMessageType): Boolean;
+begin
+  Result := DoReadMessage(GetResponseFileName(FLastRequestID), aStream, outMsgType);
+end;
+
+function TIPCClient.PeekResponse(const aStream: TStream; out
+  outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
+begin
+  Result := DoPeekResponse(GetResponseFileName(FLastRequestID), aStream, outMsgType, aTimeOut);
+end;
+
 function TIPCClient.PostRequest(const aMsgType: TMessageType;
   const aStream: TStream): Integer;
+var
+  xRequestFileStream: TFileStream;
 begin
-  Result := GetUniqueRequest(FLastMsgFileName);
-  DeleteFile(GetResponseFileName(FLastMsgFileName));//delete old response, if there is any
-  DoPostMessage(FLastMsgFileName, aMsgType, aStream);
+  xRequestFileStream := nil;
+  try
+    Result := CreateUniqueRequest(xRequestFileStream);
+    DoPostMessage(xRequestFileStream, aMsgType, aStream);
+  finally
+    xRequestFileStream.Free;
+  end;
+  FLastRequestID := Result;
 end;
 
 function TIPCClient.SendRequest(const aMsgType: TMessageType;
@@ -455,7 +555,7 @@ end;
 
 procedure TIPCServer.DeletePendingRequests;
 var
-  xRec: TRawByteSearchRec;
+  xRec: TIPCSearchRec;
   xDir: string;
 begin
   xDir := ExtractFilePath(FFileName);
@@ -468,9 +568,9 @@ begin
   FindClose(xRec);
 end;
 
-procedure TIPCServer.DeleteRequest(const aRequestID: Integer);
+function TIPCServer.DeleteRequest(const aRequestID: Integer): Boolean;
 begin
-  DeleteFile(GetPeekedRequestFileName(aRequestID));
+  Result := DeleteFile(GetPeekedRequestFileName(aRequestID));
 end;
 
 constructor TIPCServer.Create(aOwner: TComponent);
@@ -492,7 +592,7 @@ function TIPCServer.FindFirstRequest(out outFileName: string; out
   outStream: TStream; out outMsgType: TMessageType; out outMsgLen: Integer
   ): Integer;
 var
-  xRec: TRawByteSearchRec;
+  xRec: TIPCSearchRec;
 begin
   outFileName := '';
   outStream := nil;
@@ -516,7 +616,7 @@ end;
 
 function TIPCServer.FindHighestPendingRequestId: Integer;
 var
-  xRec: TRawByteSearchRec;
+  xRec: TIPCSearchRec;
   xRequestID: LongInt;
 begin
   Result := -1;
@@ -533,7 +633,7 @@ end;
 
 function TIPCServer.GetPendingRequestCount: Integer;
 var
-  xRec: TRawByteSearchRec;
+  xRec: TIPCSearchRec;
 begin
   Result := 0;
   if FindFirst(GetRequestPrefix+AllFilesMask, faAnyFile, xRec) = 0 then
@@ -555,12 +655,17 @@ var
 begin
   outMsgType := -1;
   xMsgFileName := '';
-  outRequestID := FindFirstRequest(xMsgFileName, xStream, outMsgType, xMsgLen);
-  Result := outRequestID >= 0;
-  if Result then
-  begin
+  xStream := nil;
+  try
+    outRequestID := FindFirstRequest(xMsgFileName, xStream, outMsgType, xMsgLen);
+    Result := outRequestID >= 0;
+    if Result then
+    begin
+      FreeAndNil(xStream);
+      RenameFile(xMsgFileName, GetPeekedRequestFileName(xMsgFileName));
+    end;
+  finally
     xStream.Free;
-    RenameFile(xMsgFileName, GetPeekedRequestFileName(xMsgFileName));
   end;
 end;
 
@@ -619,22 +724,9 @@ end;
 function TIPCServer.ReadRequest(const aRequestID: Integer; const aStream: TStream
   ): Boolean;
 var
-  xStream: TStream;
-  xMsgLen: Integer;
   xMsgType: TMessageType;
-  xFileRequest: string;
 begin
-  aStream.Size := 0;
-  xFileRequest := GetPeekedRequestFileName(aRequestID);
-  Result := CanReadMessage(xFileRequest, xStream, xMsgType, xMsgLen);
-  if Result then
-  begin
-    if xMsgLen > 0 then
-      aStream.CopyFrom(xStream, xMsgLen);
-    xStream.Free;
-    aStream.Position := 0;
-    DeleteFile(xFileRequest);
-  end;
+  Result := DoReadMessage(GetPeekedRequestFileName(aRequestID), aStream, xMsgType);
 end;
 
 procedure TIPCServer.SetGlobal(const aGlobal: Boolean);
@@ -680,4 +772,10 @@ begin
   FActive := False;
 end;
 
+initialization
+  InitCriticalSection(CreateUniqueRequestCritSec);
+
+finalization
+  DoneCriticalsection(CreateUniqueRequestCritSec);
+
 end.