Browse Source

* Added ability to write all write buffer data to an debug stream
* Added TAsyncWriteStream.StopAndFree

sg 23 years ago
parent
commit
6ad0fac76b
1 changed files with 831 additions and 0 deletions
  1. 831 0
      fcl/inc/fpasync.pp

+ 831 - 0
fcl/inc/fpasync.pp

@@ -0,0 +1,831 @@
+{
+    $Id$
+
+    fpAsync: Asynchronous event management for Free Pascal
+    Copyright (C) 2001-2003 by
+      Areca Systems GmbH / Sebastian Guenther, [email protected]
+
+    See the file COPYING.FPC, included in this distribution,
+    for details about the copyright.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+}
+
+unit fpAsync;
+
+{$MODE objfpc}
+{$H+}
+
+interface
+
+uses SysUtils, Classes, libasync;
+
+type
+
+  TNotifyEvent = procedure(Sender: TObject) of object;
+
+  EAsyncError = class(Exception)
+  private
+    FErrorCode: TAsyncResult;
+  public
+    constructor Create(AErrorCode: TAsyncResult);
+    property ErrorCode: TAsyncResult read FErrorCode;
+  end;
+
+  TEventLoop = class
+  private
+    FData: TAsyncData;
+    FFirstNotifyData: Pointer;
+    function GetIsRunning: Boolean;
+    procedure SetIsRunning(AIsRunning: Boolean);
+  protected
+    procedure CheckResult(AResultCode: TAsyncResult);
+  public
+    constructor Create;
+    destructor Destroy; override;
+    function Handle: TAsyncHandle;
+
+    // Main loop control
+    procedure Run;
+    procedure Break;
+
+    // Timer support
+    function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
+      ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
+    procedure RemoveTimerCallback(ATimer: TAsyncTimer);
+    function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
+      ANotify: TNotifyEvent; ASender: TObject): Pointer;
+    procedure RemoveTimerNotify(AHandle: Pointer);
+
+    // I/O notification support (for files, sockets etc.)
+    procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
+      AUserData: Pointer);
+    procedure ClearIOCallback(AHandle: Integer);
+    function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
+      ASender: TObject): Pointer;
+    procedure ClearIONotify(AHandle: Pointer);
+
+    procedure SetDataAvailableCallback(AHandle: Integer;
+      ACallback: TAsyncCallback; AUserData: Pointer);
+    procedure ClearDataAvailableCallback(AHandle: Integer);
+    function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
+      ASender: TObject): Pointer;
+    procedure ClearDataAvailableNotify(AHandle: Pointer);
+
+    procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
+      AUserData: Pointer);
+    procedure ClearCanWriteCallback(AHandle: Integer);
+    function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
+      ASender: TObject): Pointer;
+    procedure ClearCanWriteNotify(AHandle: Pointer);
+
+
+    class function TimerTicks: Int64;
+
+    // Properties
+    property IsRunning: Boolean read GetIsRunning write SetIsRunning;
+  end;
+
+
+// -------------------------------------------------------------------
+//   Asynchronous line reader
+// -------------------------------------------------------------------
+
+  TLineNotify = procedure(const ALine: String) of object;
+
+  TGenericLineReader = class
+  protected
+    RealBuffer, FBuffer: PChar;
+    FBytesInBuffer: Integer;
+    FOnLine: TLineNotify;
+
+    function  Read(var ABuffer; count: Integer): Integer; virtual; abstract;
+    procedure NoData; virtual; abstract;
+
+  public
+    destructor Destroy; override;
+    procedure Run;		// Process as many lines as possible
+
+    property Buffer: PChar read FBuffer;
+    property BytesInBuffer: Integer read FBytesInBuffer;
+    property OnLine: TLineNotify read FOnLine write FOnLine;
+  end;
+
+  TAsyncStreamLineReader = class(TGenericLineReader)
+  protected
+    FEventLoop: TEventLoop;
+    FDataStream: TStream;
+    FBlockingStream: THandleStream;
+    FOnEOF: TNotifyEvent;
+    NotifyHandle: Pointer;
+    DoStopAndFree: Boolean;
+
+    function  Read(var ABuffer; count: Integer): Integer; override;
+    procedure NoData; override;
+    procedure StreamDataAvailable(UserData: TObject);
+  public
+    constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
+    constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
+      ABlockingStream: THandleStream);
+    destructor Destroy; override;
+    procedure StopAndFree;	// Destroy instance after run
+
+    property EventLoop: TEventLoop read FEventLoop;
+    property DataStream: TStream read FDataStream;
+    property BlockingStream: THandleStream read FBlockingStream;
+    property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
+  end;
+
+
+// -------------------------------------------------------------------
+//   Asynchronous write buffers
+// -------------------------------------------------------------------
+
+  TWriteBuffer = class(TStream)
+  protected
+    FBuffer: PChar;
+    FBytesInBuffer: Integer;
+    FOnBufferEmpty: TNotifyEvent;
+
+    function  Seek(Offset: LongInt; Origin: Word): LongInt; override;
+    function  Write(const ABuffer; Count: LongInt): LongInt; override;
+    function  DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
+    procedure WritingFailed; virtual; abstract;
+    procedure WantWrite; virtual; abstract;
+    procedure BufferEmpty; virtual;
+  public
+    EndOfLineMarker: String;
+
+    constructor Create;
+    destructor Destroy; override;
+    procedure WriteLine(const line: String);
+    procedure Run;              // Write as many data as possible
+
+    property BytesInBuffer: Integer read FBytesInBuffer;
+    property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
+  end;
+
+
+  TAsyncWriteStream = class(TWriteBuffer)
+  protected
+    FEventLoop: TEventLoop;
+    FDataStream: TStream;
+    FBlockingStream: THandleStream;
+    NotifyHandle: Pointer;
+    DoStopAndFree: Boolean;
+
+    function  DoRealWrite(const ABuffer; Count: Integer): Integer; override;
+    procedure WritingFailed; override;
+    procedure WantWrite; override;
+    procedure BufferEmpty; override;
+    procedure CanWrite(UserData: TObject);
+  public
+    constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
+    constructor Create(AEventLoop: TEventLoop;
+      ADataStream: TStream; ABlockingStream: THandleStream);
+    destructor Destroy; override;
+    procedure StopAndFree;	// Destroy instance after run
+
+    property EventLoop: TEventLoop read FEventLoop;
+    property DataStream: TStream read FDataStream;
+    property BlockingStream: THandleStream read FBlockingStream;
+  end;
+
+
+var
+  { All data written to a TWriteBuffer or descendant class will be written to
+    this stream as well: }
+  fpAsyncWriteBufferDebugStream: TStream;
+
+
+implementation
+
+type
+  PNotifyData = ^TNotifyData;
+  TNotifyData = record
+    Next: PNotifyData;
+    Notify: TNotifyEvent;
+    Sender: TObject;
+    case Boolean of
+      False: (TimerHandle: TAsyncTimer);
+      True: (FileHandle: LongInt);
+  end;
+
+
+procedure EventHandler(Data: Pointer); cdecl;
+begin
+  with PNotifyData(Data)^ do
+    Notify(Sender);
+end;
+
+
+function AddNotifyData(Obj: TEventLoop): PNotifyData;
+begin
+  New(Result);
+  Result^.Next := PNotifyData(Obj.FFirstNotifyData);
+  Obj.FFirstNotifyData := Result;
+end;
+
+procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
+var
+  CurData, PrevData, NextData: PNotifyData;
+begin
+  PrevData := nil;
+  CurData := Obj.FFirstNotifyData;
+  while Assigned(CurData) do
+  begin
+    NextData := CurData^.Next;
+    if CurData = Data then
+      if Assigned(PrevData) then
+        PrevData^.Next := NextData
+      else
+        Obj.FFirstNotifyData := NextData;
+    PrevData := CurData;
+    CurData := NextData;
+  end;
+
+  Dispose(Data);
+end;
+
+
+constructor EAsyncError.Create(AErrorCode: TAsyncResult);
+begin
+  inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
+  FErrorCode := AErrorCode;
+end;
+
+
+constructor TEventLoop.Create;
+begin
+  asyncInit(Handle);
+end;
+
+destructor TEventLoop.Destroy;
+var
+  NotifyData, NextNotifyData: PNotifyData;
+begin
+  asyncFree(Handle);
+  NotifyData := FFirstNotifyData;
+  while Assigned(NotifyData) do
+  begin
+    NextNotifyData := NotifyData^.Next;
+    Dispose(NotifyData);
+    NotifyData := NextNotifyData;
+  end;
+end;
+
+function TEventLoop.Handle: TAsyncHandle;
+begin
+  Result := TAsyncHandle(Self);
+end;
+
+procedure TEventLoop.Run;
+begin
+  asyncRun(Handle);
+end;
+
+procedure TEventLoop.Break;
+begin
+  asyncBreak(Handle);
+end;
+
+function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
+  ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
+begin
+  Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
+end;
+
+procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
+begin
+  asyncRemoveTimer(Handle, ATimer);
+end;
+
+function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
+  ANotify: TNotifyEvent; ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.TimerHandle :=
+    asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
+end;
+
+procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  asyncRemoveTimer(Handle, Data^.TimerHandle);
+  FreeNotifyData(Self, Data);
+end;
+
+procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
+  AUserData: Pointer);
+begin
+  CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
+end;
+
+procedure TEventLoop.ClearIOCallback(AHandle: Integer);
+begin
+  asyncClearIOCallback(Handle, AHandle);
+end;
+
+function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
+  ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+  ResultCode: TAsyncResult;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.FileHandle := AHandle;
+  ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
+  if ResultCode <> asyncOK then
+  begin
+    FreeNotifyData(Self, UserData);
+    raise EAsyncError.Create(ResultCode);
+  end else
+    Result := UserData;
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
+end;
+
+procedure TEventLoop.ClearIONotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
+  asyncClearIOCallback(Handle, Data^.FileHandle);
+  FreeNotifyData(Self, Data);
+end;
+
+procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
+  AUserData: Pointer);
+begin
+  CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
+    ACallback, AUserData));
+end;
+
+procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
+begin
+  asyncClearDataAvailableCallback(Handle, AHandle);
+end;
+
+function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
+  ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+  ResultCode: TAsyncResult;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.FileHandle := AHandle;
+  ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
+    @EventHandler, UserData);
+  if ResultCode <> asyncOK then
+  begin
+    FreeNotifyData(Self, UserData);
+    raise EAsyncError.Create(ResultCode);
+  end else
+    Result := UserData;
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
+end;
+
+procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
+  asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
+  FreeNotifyData(Self, Data);
+end;
+
+procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
+  AUserData: Pointer);
+begin
+  CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
+end;
+
+procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
+begin
+  asyncClearCanWriteCallback(Handle, AHandle);
+end;
+
+function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
+  ASender: TObject): Pointer;
+var
+  UserData: PNotifyData;
+  ResultCode: TAsyncResult;
+begin
+  UserData := AddNotifyData(Self);
+  UserData^.Notify := ANotify;
+  UserData^.Sender := ASender;
+  UserData^.FileHandle := AHandle;
+  ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
+    @EventHandler, UserData);
+  if ResultCode <> asyncOK then
+  begin
+    FreeNotifyData(Self, UserData);
+    raise EAsyncError.Create(ResultCode);
+  end else
+    Result := UserData;
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
+end;
+
+procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
+var
+  Data: PNotifyData;
+begin
+  Data := PNotifyData(AHandle);
+  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
+  asyncClearCanWriteCallback(Handle, Data^.FileHandle);
+  FreeNotifyData(Self, Data);
+end;
+
+class function TEventLoop.TimerTicks: Int64;
+begin
+  Result := asyncGetTicks;
+end;
+
+procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
+begin
+  if AResultCode <> asyncOK then
+    raise EAsyncError.Create(AResultCode);
+end;
+
+function TEventLoop.GetIsRunning: Boolean;
+begin
+  Result := asyncIsRunning(Handle);
+end;
+
+procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
+begin
+  if IsRunning then
+  begin
+    if not AIsRunning then
+      Run;
+  end else
+    if AIsRunning then
+      Break;
+end;
+
+
+// -------------------------------------------------------------------
+//   TGenericLineReader
+// -------------------------------------------------------------------
+
+destructor TGenericLineReader.Destroy;
+begin
+  if Assigned(RealBuffer) then
+  begin
+    FreeMem(RealBuffer);
+    RealBuffer := nil;
+  end;
+  inherited Destroy;
+end;
+
+procedure TGenericLineReader.Run;
+var
+  NewData: array[0..1023] of Byte;
+  p: PChar;
+  BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
+  line: String;
+  FirstRun: Boolean;
+begin
+  FirstRun := True;
+  while True do
+  begin
+    BytesRead := Read(NewData, SizeOf(NewData));
+    //WriteLn('Linereader: ', BytesRead, ' bytes read');
+    if BytesRead <= 0 then begin
+      if FirstRun then
+        NoData;
+      break;
+    end;
+    FirstRun := False;
+    OldBufSize := FBytesInBuffer;
+
+    // Append the new received data to the read buffer
+    Inc(FBytesInBuffer, BytesRead);
+    ReallocMem(RealBuffer, FBytesInBuffer);
+    Move(NewData, RealBuffer[OldBufSize], BytesRead);
+
+    {Process all potential lines in the current buffer. Attention: FBuffer and
+     FBytesInBuffer MUST be updated for each line, as they can be accessed from
+     within the FOnLine handler!}
+    LastEndOfLine := 0;
+    if OldBufSize > 0 then
+      i := OldBufSize - 1
+    else
+      i := 0;
+
+    CurBytesInBuffer := FBytesInBuffer;
+
+    while i <= CurBytesInBuffer - 2 do
+    begin
+      if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
+      begin
+        LineLength := i - LastEndOfLine;
+        SetLength(line, LineLength);
+        if LineLength > 0 then
+          Move(RealBuffer[LastEndOfLine], line[1], LineLength);
+
+        if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
+           ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
+          Inc(i);
+        LastEndOfLine := i + 1;
+
+        if Assigned(FOnLine) then begin
+          FBuffer := RealBuffer + LastEndOfLine;
+          FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
+          FOnLine(line);
+          // Check if <this> has been destroyed by FOnLine:
+          if not Assigned(FBuffer) then exit;
+        end;
+      end;
+      Inc(i);
+    end;
+
+    FBytesInBuffer := CurBytesInBuffer;
+
+    if LastEndOfLine > 0 then
+    begin
+      // Remove all processed lines from the buffer
+      Dec(FBytesInBuffer, LastEndOfLine);
+      GetMem(p, FBytesInBuffer);
+      Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
+      if Assigned(RealBuffer) then
+        FreeMem(RealBuffer);
+      RealBuffer := p;
+    end;
+    FBuffer := RealBuffer;
+  end;
+end;
+
+
+// -------------------------------------------------------------------
+//   TAsyncStreamLineReader
+// -------------------------------------------------------------------
+
+constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
+  AStream: THandleStream);
+begin
+  Self.Create(AEventLoop, AStream, AStream);
+end;
+
+constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
+  ADataStream: TStream; ABlockingStream: THandleStream);
+begin
+  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
+
+  inherited Create;
+  FEventLoop := AEventLoop;
+  FDataStream := ADataStream;
+  FBlockingStream := ABlockingStream;
+  NotifyHandle := EventLoop.SetDataAvailableNotify(
+    FBlockingStream.Handle, @StreamDataAvailable, nil);
+end;
+
+destructor TAsyncStreamLineReader.Destroy;
+begin
+  inherited Destroy;
+end;
+
+procedure TAsyncStreamLineReader.StopAndFree;
+begin
+  if Assigned(NotifyHandle) then
+  begin
+    EventLoop.ClearDataAvailableNotify(NotifyHandle);
+    NotifyHandle := nil;
+  end;
+  DoStopAndFree := True;
+end;
+
+function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
+begin
+  Result := FDataStream.Read(ABuffer, count);
+end;
+
+procedure TAsyncStreamLineReader.NoData;
+var
+  s: String;
+begin
+  if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin
+
+    if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin
+      if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
+        Dec(FBytesInBuffer);
+      SetLength(s, FBytesInBuffer);
+      Move(FBuffer^, s[1], FBytesInBuffer);
+      FOnLine(s);
+    end;
+
+    EventLoop.ClearDataAvailableNotify(NotifyHandle);
+    NotifyHandle := nil;
+    if Assigned(FOnEOF) then
+      FOnEOF(Self);
+  end;
+end;
+
+procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
+begin
+  Run;
+  if DoStopAndFree then
+    Free;
+end;
+
+
+// -------------------------------------------------------------------
+//   TWriteBuffer
+// -------------------------------------------------------------------
+
+procedure TWriteBuffer.BufferEmpty;
+begin
+  if Assigned(FOnBufferEmpty) then
+    FOnBufferEmpty(Self);
+end;
+
+constructor TWriteBuffer.Create;
+begin
+  inherited Create;
+
+  FBuffer := nil;
+  FBytesInBuffer := 0;
+  EndOfLineMarker := #10;
+end;
+
+destructor TWriteBuffer.Destroy;
+begin
+  if Assigned(FBuffer) then
+    FreeMem(FBuffer);
+  inherited Destroy;
+end;
+
+function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
+begin
+  if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
+     ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
+    Result := FBytesInBuffer
+  else
+    // !!!: No i18n for this string - solve this problem in the FCL?!?
+    raise EStreamError.Create('Invalid stream operation');
+end;
+
+function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
+begin
+  ReallocMem(FBuffer, FBytesInBuffer + Count);
+  Move(ABuffer, FBuffer[FBytesInBuffer], Count);
+  Inc(FBytesInBuffer, Count);
+  if Assigned(fpAsyncWriteBufferDebugStream) then
+    fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
+  WantWrite;
+  Result := Count;
+end;
+
+procedure TWriteBuffer.WriteLine(const line: String);
+var
+  s: String;
+begin
+  s := line + EndOfLineMarker;
+  WriteBuffer(s[1], Length(s));
+end;
+
+procedure TWriteBuffer.Run;
+var
+  CurStart, Written: Integer;
+  NewBuf: PChar;
+  Failed: Boolean;
+begin
+  CurStart := 0;
+  Failed := True;
+  repeat
+    if FBytesInBuffer = 0 then
+    begin
+      BufferEmpty;
+      exit;
+    end;
+
+    Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
+    if Written > 0 then
+    begin
+      Inc(CurStart, Written);
+      Failed := False;
+      GetMem(NewBuf, FBytesInBuffer - CurStart);
+      Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
+      FreeMem(FBuffer);
+      FBuffer := NewBuf;
+      Dec(FBytesInBuffer, CurStart);
+    end;
+  until Written <= 0;
+
+  if Failed then
+    WritingFailed;
+end;
+
+
+// -------------------------------------------------------------------
+//   TAsyncWriteStream
+// -------------------------------------------------------------------
+
+function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
+begin
+  Result := FDataStream.Write(ABuffer, count);
+end;
+
+procedure TAsyncWriteStream.WritingFailed;
+begin
+  if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
+  begin
+    EventLoop.ClearCanWriteNotify(NotifyHandle);
+    NotifyHandle := nil;
+  end;
+end;
+
+procedure TAsyncWriteStream.WantWrite;
+begin
+  if not Assigned(NotifyHandle) then
+    NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
+      @CanWrite, nil);
+end;
+
+procedure TAsyncWriteStream.BufferEmpty;
+begin
+  if Assigned(NotifyHandle) then
+  begin
+    EventLoop.ClearCanWriteNotify(NotifyHandle);
+    NotifyHandle := nil;
+  end;
+  inherited BufferEmpty;
+end;
+
+procedure TAsyncWriteStream.CanWrite(UserData: TObject);
+begin
+  Run;
+  if DoStopAndFree then
+    Free;
+end;
+
+constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
+  AStream: THandleStream);
+begin
+  Self.Create(AEventLoop, AStream, AStream);
+end;
+
+constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
+  ADataStream: TStream; ABlockingStream: THandleStream);
+begin
+  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
+
+  inherited Create;
+  FEventLoop := AEventLoop;
+  FDataStream := ADataStream;
+  FBlockingStream := ABlockingStream;
+end;
+
+destructor TAsyncWriteStream.Destroy;
+begin
+  if Assigned(NotifyHandle) then
+    EventLoop.ClearCanWriteNotify(NotifyHandle);
+  inherited Destroy;
+end;
+
+procedure TAsyncWriteStream.StopAndFree;
+begin
+  if Assigned(NotifyHandle) then
+  begin
+    EventLoop.ClearCanWriteNotify(NotifyHandle);
+    NotifyHandle := nil;
+  end;
+  DoStopAndFree := True;
+end;
+
+
+end.
+
+
+{
+  $Log$
+  Revision 1.2  2002-04-25 19:12:27  sg
+  * Added ability to write all write buffer data to an debug stream
+  * Added TAsyncWriteStream.StopAndFree
+
+  Revision 1.1  2003/03/17 22:25:32  michael
+  + Async moved from package to FCL
+
+  Revision 1.3  2002/09/15 15:45:38  sg
+  * Added stream line reader classes
+
+  Revision 1.2  2002/09/07 15:42:57  peter
+    * old logs removed and tabs fixed
+
+  Revision 1.1  2002/01/29 17:55:02  peter
+    * splitted to base and extra
+
+}