Forráskód Böngészése

* Now different handlers for available data and space in write buffer can
be set
* LOTS of bugfixes in the implementation
* fpAsync now has a write buffer class (a read buffer class for reading
line by line will be included in the next release)

sg 24 éve
szülő
commit
f337da642b

+ 222 - 1
packages/asyncio/fpasync/fpasync.inc

@@ -165,6 +165,72 @@ begin
   FreeNotifyData(Self, Data);
 end;
 
+procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
+  AUserData: Pointer);
+begin
+  asyncSetDataAvailableCallback(Handle, AHandle, ACallback, AUserData);
+end;
+
+procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
+begin
+  asyncClearDataAvailableCallback(Handle, AHandle);
+end;
+
+function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
+  ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.FileHandle := AHandle;
+  asyncSetDataAvailableCallback(Handle, AHandle, @EventHandler, UserData);
+  Result := UserData;
+end;
+
+procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
+  FreeNotifyData(Self, Data);
+end;
+
+procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
+  AUserData: Pointer);
+begin
+  asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData);
+end;
+
+procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
+begin
+  asyncClearCanWriteCallback(Handle, AHandle);
+end;
+
+function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
+  ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.FileHandle := AHandle;
+  asyncSetCanWriteCallback(Handle, AHandle, @EventHandler, UserData);
+  Result := UserData;
+end;
+
+procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  asyncClearCanWriteCallback(Handle, Data^.FileHandle);
+  FreeNotifyData(Self, Data);
+end;
+
 function TEventLoop.TimerTicks: Int64;
 begin
   Result := asyncGetTicks;
@@ -187,9 +253,164 @@ begin
 end;
 
 
+// -------------------------------------------------------------------
+//   TWriteBuffer
+// -------------------------------------------------------------------
+
+procedure TWriteBuffer.BufferEmpty;
+begin
+  if Assigned(FOnBufferEmpty) then
+    FOnBufferEmpty(Self);
+end;
+
+constructor TWriteBuffer.Create;
+begin
+  inherited Create;
+
+  FBuffer := nil;
+  FBytesInBuffer := 0;
+  EndOfLineMarker := #10;
+end;
+
+destructor TWriteBuffer.Destroy;
+begin
+  if Assigned(FBuffer) then
+    FreeMem(FBuffer);
+  inherited Destroy;
+end;
+
+function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
+begin
+  if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
+     ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
+    Result := FBytesInBuffer
+  else
+    // !!!: No i18n for this string - solve this problem in the FCL?!?
+    raise EStreamError.Create('Invalid stream operation');
+end;
+
+function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
+begin
+  ReallocMem(FBuffer, FBytesInBuffer + Count);
+  Move(ABuffer, FBuffer[FBytesInBuffer], Count);
+  Inc(FBytesInBuffer, Count);
+  WantWrite;
+  Result := Count;
+end;
+
+procedure TWriteBuffer.WriteLine(const line: String);
+var
+  s: String;
+begin
+  s := line + EndOfLineMarker;
+  WriteBuffer(s[1], Length(s));
+end;
+
+procedure TWriteBuffer.Run;
+var
+  CurStart, Written: Integer;
+  NewBuf: PChar;
+  Failed: Boolean;
+begin
+  CurStart := 0;
+  Failed := True;
+  repeat
+    if FBytesInBuffer = 0 then
+    begin
+      BufferEmpty;
+      exit;
+    end;
+
+    Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
+    if Written > 0 then
+    begin
+      Inc(CurStart, Written);
+      Failed := False;
+      GetMem(NewBuf, FBytesInBuffer - CurStart);
+      Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
+      FreeMem(FBuffer);
+      FBuffer := NewBuf;
+      Dec(FBytesInBuffer, CurStart);
+    end;
+  until Written <= 0;
+
+  if Failed then
+    WritingFailed;
+end;
+
+
+// -------------------------------------------------------------------
+//   TAsyncWriteStream
+// -------------------------------------------------------------------
+
+function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
+begin
+  Result := FDataStream.Write(ABuffer, count);
+end;
+
+procedure TAsyncWriteStream.WritingFailed;
+begin
+  if (FDataStream <> FBlockingStream) and Assigned(FNotifyHandle) then
+  begin
+    FManager.ClearCanWriteNotify(FNotifyHandle);
+    FNotifyHandle := nil;
+  end;
+end;
+
+procedure TAsyncWriteStream.WantWrite;
+begin
+  FNotifyHandle := FManager.SetCanWriteNotify(FBlockingStream.Handle, @CanWrite, nil);
+end;
+
+procedure TAsyncWriteStream.BufferEmpty;
+begin
+  if Assigned(FNotifyHandle) then
+  begin
+    FManager.ClearCanWriteNotify(FNotifyHandle);
+    FNotifyHandle := nil;
+  end;
+  inherited BufferEmpty;
+end;
+
+procedure TAsyncWriteStream.CanWrite(UserData: TObject);
+begin
+  Run;
+end;
+
+constructor TAsyncWriteStream.Create(AManager: TEventLoop; AStream: THandleStream);
+begin
+  Self.Create(AManager, AStream, AStream);
+end;
+
+constructor TAsyncWriteStream.Create(AManager: TEventLoop;
+  ADataStream: TStream; ABlockingStream: THandleStream);
+begin
+  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
+
+  inherited Create;
+  FManager := AManager;
+  FDataStream := ADataStream;
+  FBlockingStream := ABlockingStream;
+end;
+
+destructor TAsyncWriteStream.Destroy;
+begin
+  if Assigned(FNotifyHandle) then
+    FManager.ClearCanWriteNotify(FNotifyHandle);
+  inherited Destroy;
+end;
+
+
 {
   $Log$
-  Revision 1.1.2.1  2001-09-08 15:43:24  sg
+  Revision 1.1.2.2  2001-11-16 12:51:41  sg
+  * Now different handlers for available data and space in write buffer can
+    be set
+  * LOTS of bugfixes in the implementation
+  * fpAsync now has a write buffer class (a read buffer class for reading
+    line by line will be included in the next release)
+
+  Revision 1.1.2.1  2001/09/08 15:43:24  sg
   * First public version
 
 }

+ 75 - 1
packages/asyncio/fpasync/fpasynch.inc

@@ -48,6 +48,21 @@ type
       ASender: TObject): Pointer;
     procedure ClearIONotify(AHandle: Pointer);
 
+    procedure SetDataAvailableCallback(AHandle: Integer;
+      ACallback: TAsyncCallback; AUserData: Pointer);
+    procedure ClearDataAvailableCallback(AHandle: Integer);
+    function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
+      ASender: TObject): Pointer;
+    procedure ClearDataAvailableNotify(AHandle: Pointer);
+
+    procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
+      AUserData: Pointer);
+    procedure ClearCanWriteCallback(AHandle: Integer);
+    function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
+      ASender: TObject): Pointer;
+    procedure ClearCanWriteNotify(AHandle: Pointer);
+
+
     class function TimerTicks: Int64;
 
     // Properties
@@ -55,9 +70,68 @@ type
   end;
 
 
+// -------------------------------------------------------------------
+//   Asynchronous write buffers
+// -------------------------------------------------------------------
+
+  TWriteBuffer = class(TStream)
+  protected
+    FBuffer: PChar;
+    FBytesInBuffer: Integer;
+    FOnBufferEmpty: TNotifyEvent;
+
+    function  Seek(Offset: LongInt; Origin: Word): LongInt; override;
+    function  Write(const ABuffer; Count: LongInt): LongInt; override;
+    function  DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
+    procedure WritingFailed; virtual; abstract;
+    procedure WantWrite; virtual; abstract;
+    procedure BufferEmpty; virtual;
+  public
+    EndOfLineMarker: String;
+
+    constructor Create;
+    destructor Destroy; override;
+    procedure WriteLine(const line: String);
+    procedure Run;		// Write as many data as possible
+
+    property BytesInBuffer: Integer read FBytesInBuffer;
+    property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
+  end;
+
+
+  TAsyncWriteStream = class(TWriteBuffer)
+  protected
+    FManager: TEventLoop;
+    FDataStream: TStream;
+    FBlockingStream: THandleStream;
+    FNotifyHandle: Pointer;
+
+    function  DoRealWrite(const ABuffer; Count: Integer): Integer; override;
+    procedure WritingFailed; override;
+    procedure WantWrite; override;
+    procedure BufferEmpty; override;
+    procedure CanWrite(UserData: TObject);
+  public
+    constructor Create(AManager: TEventLoop; AStream: THandleStream);
+    constructor Create(AManager: TEventLoop;
+      ADataStream: TStream; ABlockingStream: THandleStream);
+    destructor Destroy; override;
+
+    property DataStream: TStream read FDataStream;
+    property BlockingStream: THandleStream read FBlockingStream;
+  end;
+
+
 {
   $Log$
-  Revision 1.1.2.1  2001-09-08 15:43:24  sg
+  Revision 1.1.2.2  2001-11-16 12:51:41  sg
+  * Now different handlers for available data and space in write buffer can
+    be set
+  * LOTS of bugfixes in the implementation
+  * fpAsync now has a write buffer class (a read buffer class for reading
+    line by line will be included in the next release)
+
+  Revision 1.1.2.1  2001/09/08 15:43:24  sg
   * First public version
 
 }

+ 9 - 2
packages/asyncio/fpasync/unix/fpasync.pp

@@ -21,7 +21,7 @@ unit fpAsync;
 
 interface
 
-uses libasync;
+uses Classes, libasync;
 
 type
 
@@ -41,7 +41,14 @@ end.
 
 {
   $Log$
-  Revision 1.1.2.1  2001-09-08 15:43:24  sg
+  Revision 1.1.2.2  2001-11-16 12:51:41  sg
+  * Now different handlers for available data and space in write buffer can
+    be set
+  * LOTS of bugfixes in the implementation
+  * fpAsync now has a write buffer class (a read buffer class for reading
+    line by line will be included in the next release)
+
+  Revision 1.1.2.1  2001/09/08 15:43:24  sg
   * First public version
 
 }

+ 28 - 1
packages/asyncio/libasync/libasync.inc

@@ -81,10 +81,37 @@ procedure asyncClearIOCallback(
   Handle: TAsyncHandle;
   IOHandle: LongInt); cdecl;
 
+procedure asyncSetDataAvailableCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt;
+  Callback: TAsyncCallback;
+  UserData: Pointer); cdecl;
+
+procedure asyncClearDataAvailableCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt); cdecl;
+
+procedure asyncSetCanWriteCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt;
+  Callback: TAsyncCallback;
+  UserData: Pointer); cdecl;
+
+procedure asyncClearCanWriteCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt); cdecl;
+
 
 {
   $Log$
-  Revision 1.1.2.1  2001-09-08 15:43:24  sg
+  Revision 1.1.2.2  2001-11-16 12:51:41  sg
+  * Now different handlers for available data and space in write buffer can
+    be set
+  * LOTS of bugfixes in the implementation
+  * fpAsync now has a write buffer class (a read buffer class for reading
+    line by line will be included in the next release)
+
+  Revision 1.1.2.1  2001/09/08 15:43:24  sg
   * First public version
 
 }

+ 141 - 25
packages/asyncio/libasync/unix/libasync.pp

@@ -25,6 +25,7 @@ type
 
   TAsyncData = record
     IsRunning, DoBreak: Boolean;
+    HasCallbacks: Boolean;	// True as long as callbacks are set
     FirstTimer: Pointer;
     FirstIOCallback: Pointer;
     FDData: Pointer;
@@ -70,10 +71,12 @@ procedure InitIOCallback(Handle: TAsyncHandle; IOHandle: LongInt;
 var
   Data: PIOCallbackData;
   i: LongInt;
+  NeedData: Boolean;
 begin
   if IOHandle > MaxHandle then
     exit;
 
+  NeedData := True;
   Data := Handle^.Data.FirstIOCallback;
   while Assigned(Data) do
   begin
@@ -89,34 +92,40 @@ begin
         Data^.WriteCallback := WriteCallback;
 	Data^.WriteUserData := WriteUserData;
       end;
-      exit;
+      NeedData := False;
+      break;
     end;
     Data := Data^.Next;
   end;
 
-  New(Data);
-  Data^.Next := Handle^.Data.FirstIOCallback;
-  Handle^.Data.FirstIOCallback := Data;
-  Data^.IOHandle := IOHandle;
-  if ARead then
+  if NeedData then
   begin
-    Data^.ReadCallback := ReadCallback;
-    Data^.ReadUserData := ReadUserData;
-  end;
-  if AWrite then
-  begin
-    Data^.WriteCallback := WriteCallback;
-    Data^.WriteUserData := WriteUserData;
-  end;
+    New(Data);
+    Data^.Next := Handle^.Data.FirstIOCallback;
+    Handle^.Data.FirstIOCallback := Data;
+    Data^.IOHandle := IOHandle;
+    if ARead then
+    begin
+      Data^.ReadCallback := ReadCallback;
+      Data^.ReadUserData := ReadUserData;
+    end else
+      Data^.ReadCallback := nil;
+    if AWrite then
+    begin
+      Data^.WriteCallback := WriteCallback;
+      Data^.WriteUserData := WriteUserData;
+    end else
+      Data^.WriteCallback := nil;
 
-  if not Assigned(Handle^.Data.FDData) then
-  begin
-    GetMem(Handle^.Data.FDData, SizeOf(TFDSet) * 2);
-    FD_Zero(PFDSet(Handle^.Data.FDData)[0]);
-    FD_Zero(PFDSet(Handle^.Data.FDData)[1]);
+    if not Assigned(Handle^.Data.FDData) then
+    begin
+      GetMem(Handle^.Data.FDData, SizeOf(TFDSet) * 2);
+      FD_Zero(PFDSet(Handle^.Data.FDData)[0]);
+      FD_Zero(PFDSet(Handle^.Data.FDData)[1]);
+    end;
+    if IOHandle > Handle^.Data.HighestHandle then
+      Handle^.Data.HighestHandle := IOHandle;
   end;
-  if IOHandle > Handle^.Data.HighestHandle then
-    Handle^.Data.HighestHandle := IOHandle;
 
   Data^.SavedHandleFlags := fcntl(IOHandle, F_GetFl);
   fcntl(IOHandle, F_SetFl, Data^.SavedHandleFlags or Open_NonBlock);
@@ -145,8 +154,19 @@ begin
 	  FD_Set(IOHandle, PFDSet(Handle^.Data.FDData)[1]);
       end;
   end;
+
+  Handle^.Data.HasCallbacks := True;
 end;
 
+procedure CheckForCallbacks(Handle: TAsyncHandle);
+var
+  Data: PIOCallbackData;
+begin
+  if (Handle^.Data.HasCallbacks) and
+    (not Assigned(Handle^.Data.FirstIOCallback)) and
+    (not Assigned(Handle^.Data.FirstTimer)) then
+    Handle^.Data.HasCallbacks := False;
+end;
 
 
 
@@ -190,6 +210,9 @@ var
   CurReadFDSet, CurWriteFDSet: TFDSet;
   IOCallback: PIOCallbackData;
 begin
+  if Handle^.Data.IsRunning then
+    exit;
+
   Handle^.Data.DoBreak := False;
   Handle^.Data.IsRunning := True;
 
@@ -205,7 +228,7 @@ begin
     end;
   end;
 
-  while not Handle^.Data.DoBreak do
+  while (not Handle^.Data.DoBreak) and Handle^.Data.HasCallbacks do
   begin
     Timer := Handle^.Data.FirstTimer;
     if Assigned(Handle^.Data.FirstTimer) then
@@ -328,6 +351,8 @@ begin
   Data^.UserData := UserData;
   if Handle^.Data.IsRunning then
     Data^.NextTick := asyncGetTicks + MSec;
+
+  Handle^.Data.HasCallbacks := True;
 end;
 
 procedure asyncRemoveTimer(
@@ -354,6 +379,7 @@ begin
     CurData := NextData;
   end;
   Dispose(Data);
+  CheckForCallbacks(Handle);
 end;
 
 procedure asyncSetIOCallback(
@@ -371,23 +397,106 @@ procedure asyncClearIOCallback(Handle: TAsyncHandle;
 var
   CurData, PrevData, NextData: PIOCallbackData;
 begin
-  CurData := Handle^.Data.FirstTimer;
+  CurData := Handle^.Data.FirstIOCallback;
   PrevData := nil;
   while Assigned(CurData) do
   begin
     NextData := CurData^.Next;
     if CurData^.IOHandle = IOHandle then
     begin
+      FD_Clr(IOHandle, PFDSet(Handle^.Data.FDData)[0]);
+      FD_Clr(IOHandle, PFDSet(Handle^.Data.FDData)[1]);
       if Assigned(PrevData) then
         PrevData^.Next := NextData
       else
-        Handle^.Data.FirstTimer := NextData;
+        Handle^.Data.FirstIOCallback := NextData;
       Dispose(CurData);
       break;
     end;
     PrevData := CurData;
     CurData := NextData;
   end;
+  CheckForCallbacks(Handle);
+end;
+
+procedure asyncSetDataAvailableCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt;
+  Callback: TAsyncCallback;
+  UserData: Pointer); cdecl;
+begin
+  InitIOCallback(Handle, IOHandle, True, Callback, UserData, False, nil, nil);
+end;
+
+procedure asyncClearDataAvailableCallback(Handle: TAsyncHandle;
+  IOHandle: LongInt); cdecl;
+var
+  CurData, PrevData, NextData: PIOCallbackData;
+begin
+  CurData := Handle^.Data.FirstIOCallback;
+  PrevData := nil;
+  while Assigned(CurData) do
+  begin
+    NextData := CurData^.Next;
+    if CurData^.IOHandle = IOHandle then
+    begin
+      FD_Clr(IOHandle, PFDSet(Handle^.Data.FDData)[0]);
+      if Assigned(CurData^.WriteCallback) then
+        CurData^.ReadCallback := nil
+      else
+      begin
+        if Assigned(PrevData) then
+          PrevData^.Next := NextData
+        else
+          Handle^.Data.FirstIOCallback := NextData;
+        Dispose(CurData);
+      end;
+      break;
+    end;
+    PrevData := CurData;
+    CurData := NextData;
+  end;
+  CheckForCallbacks(Handle);
+end;
+
+procedure asyncSetCanWriteCallback(
+  Handle: TAsyncHandle;
+  IOHandle: LongInt;
+  Callback: TAsyncCallback;
+  UserData: Pointer); cdecl;
+begin
+  InitIOCallback(Handle, IOHandle, False, nil, nil, True, Callback, UserData);
+end;
+
+procedure asyncClearCanWriteCallback(Handle: TAsyncHandle;
+  IOHandle: LongInt); cdecl;
+var
+  CurData, PrevData, NextData: PIOCallbackData;
+begin
+  CurData := Handle^.Data.FirstIOCallback;
+  PrevData := nil;
+  while Assigned(CurData) do
+  begin
+    NextData := CurData^.Next;
+    if CurData^.IOHandle = IOHandle then
+    begin
+      FD_Clr(IOHandle, PFDSet(Handle^.Data.FDData)[1]);
+      if Assigned(CurData^.ReadCallback) then
+        CurData^.WriteCallback := nil
+      else
+      begin
+        if Assigned(PrevData) then
+          PrevData^.Next := NextData
+        else
+          Handle^.Data.FirstIOCallback := NextData;
+        Dispose(CurData);
+      end;
+      break;
+    end;
+    PrevData := CurData;
+    CurData := NextData;
+  end;
+  CheckForCallbacks(Handle);
 end;
 
 
@@ -396,7 +505,14 @@ end.
 
 {
   $Log$
-  Revision 1.1.2.1  2001-09-08 15:43:24  sg
+  Revision 1.1.2.2  2001-11-16 12:51:41  sg
+  * Now different handlers for available data and space in write buffer can
+    be set
+  * LOTS of bugfixes in the implementation
+  * fpAsync now has a write buffer class (a read buffer class for reading
+    line by line will be included in the next release)
+
+  Revision 1.1.2.1  2001/09/08 15:43:24  sg
   * First public version
 
 }