| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 | {    $Id$    Async_IO: Mananging class for asynchronous input/output    Copyright (C) 2000 by Sebastian Guenther ([email protected])    See the file COPYING.FPC, included in this distribution,    for details about the copyright.    This program is distributed in the hope that it will be useful,    but WITHOUT ANY WARRANTY; without even the implied warranty of    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.}{$MODE objfpc}{$H+}unit AsyncIO;interface{$i asyncioh.inc}    type  TAsyncIOManager = class;  TAsyncIONotify = procedure(UserData: TObject) of object;  TAsyncIONotifyInfo = record    Method: TAsyncIONotify;    UserData: TObject;  end;// -------------------------------------------------------------------//   TAsyncIOManager// -------------------------------------------------------------------  TAsyncIOManager = class  protected    DoBreak: Boolean;    IOdata : TIOdata;    ReadNotifies, WriteNotifies: array[0..MaxHandle] of TAsyncIONotifyInfo;    HighestHandle: Integer;    FTimeout: Integer;    TimeoutNotify: TAsyncIONotifyInfo;    procedure CalcHighestHandle(max: Integer);    procedure ExecuteNotify(const Notify: TAsyncIONotifyInfo);    function GetHandleAsync(AHandle: Integer): Boolean;    procedure SetHandleAsync(AHandle: Integer; AValue: Boolean);  public    constructor Create;    procedure Run;    procedure BreakRun;    procedure SetReadHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);    procedure ClearReadHandler(AHandle: Integer);    function GetReadHandler(AHandle: Integer): TAsyncIONotify;    procedure SetWriteHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);    procedure ClearWriteHandler(AHandle: Integer);    function GetWriteHandler(AHandle: Integer): TAsyncIONotify;    procedure SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);    procedure ClearTimeoutHandler;    function GetTimeoutHandler: TAsyncIONotify;    property Timeout: Integer read FTimeout write FTimeout;    property HandleAsync[AHandle: Integer]: Boolean read GetHandleAsync write SetHandleAsync;  end;// -------------------------------------------------------------------//   Line reader classes// -------------------------------------------------------------------  TLineNotify = procedure(const line: String) of object;  PBoolean = ^Boolean;  TGenericLineReader = class  protected    FDestroyedFlag: PBoolean;    RealBuffer, FBuffer: PChar;    FBytesInBuffer: Integer;    FOnLine: TLineNotify;    function  Read(var ABuffer; count: Integer): Integer; virtual; abstract;    procedure NoData; virtual; abstract;  public    destructor Destroy; override;    procedure Run;		// Process as many lines as possible    property Buffer: PChar read FBuffer;    property BytesInBuffer: Integer read FBytesInBuffer;    property OnLine: TLineNotify read FOnLine write FOnLine;  end;  TAsyncStreamLineReader = class(TGenericLineReader)  protected    FManager: TAsyncIOManager;    FDataStream: TStream;    FBlockingStream: THandleStream;    FOnEOF: TNotifyEvent;    function  Read(var ABuffer; count: Integer): Integer; override;    procedure NoData; override;    procedure StreamDataAvailable(UserData: TObject);  public    constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);    constructor Create(AManager: TAsyncIOManager;      ADataStream: TStream; ABlockingStream: THandleStream);    destructor Destroy; override;    property DataStream: TStream read FDataStream;    property BlockingStream: THandleStream read FBlockingStream;    property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;  end;// -------------------------------------------------------------------//   TWriteBuffer// -------------------------------------------------------------------  TWriteBuffer = class(TStream)  protected    FBuffer: PChar;    FBytesInBuffer: Integer;    FOnBufferEmpty: TNotifyEvent;    function  Seek(Offset: LongInt; Origin: Word): LongInt; override;    function  Write(const ABuffer; Count: LongInt): LongInt; override;    function  DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;    procedure WritingFailed; virtual; abstract;    procedure WantWrite; virtual; abstract;    procedure BufferEmpty; virtual;    constructor Create;  public    EndOfLineMarker: String;    destructor Destroy; override;    procedure WriteLine(const line: String);    procedure Run;		// Write as many data as possible    property BytesInBuffer: Integer read FBytesInBuffer;    property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;  end;  TAsyncWriteStream = class(TWriteBuffer)  protected    FManager: TAsyncIOManager;    FDataStream: TStream;    FBlockingStream: THandleStream;    function  DoRealWrite(const ABuffer; Count: Integer): Integer; override;    procedure WritingFailed; override;    procedure WantWrite; override;    procedure BufferEmpty; override;    procedure CanWrite(UserData: TObject);  public    constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);    constructor Create(AManager: TAsyncIOManager;      ADataStream: TStream; ABlockingStream: THandleStream);    destructor Destroy; override;    property DataStream: TStream read FDataStream;    property BlockingStream: THandleStream read FBlockingStream;  end;// ===================================================================// ===================================================================implementationuses SysUtils;{$i asyncio.inc}// -------------------------------------------------------------------//   TAsyncIOManager// -------------------------------------------------------------------procedure TAsyncIOManager.ExecuteNotify(const Notify: TAsyncIONotifyInfo);begin  if Assigned(Notify.Method) then    Notify.Method(Notify.UserData);end;procedure TAsyncIOManager.SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);begin  TimeoutNotify.Method := AMethod;  TimeoutNotify.UserData := AUserData;end;procedure TAsyncIOManager.ClearTimeoutHandler;begin  TimeoutNotify.Method := nil;end;function TAsyncIOManager.GetTimeoutHandler: TAsyncIONotify;begin  Result := TimeoutNotify.Method;end;// -------------------------------------------------------------------//   TGenericLineReader// -------------------------------------------------------------------destructor TGenericLineReader.Destroy;begin  if Assigned(FDestroyedFlag) then    FDestroyedFlag^ := True;  if Assigned(RealBuffer) then  begin    FreeMem(RealBuffer);    RealBuffer := nil;  end;  inherited Destroy;end;procedure TGenericLineReader.Run;var  NewData: array[0..1023] of Byte;  p: PChar;  BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;  line: String;  FirstRun, DestroyedFlag: Boolean;begin  FirstRun := True;  DestroyedFlag := False;  while True do  begin    BytesRead := Read(NewData, SizeOf(NewData));    //WriteLn('Linereader: ', BytesRead, ' bytes read');    if BytesRead <= 0 then begin      if FirstRun then        NoData;      break;    end;    FirstRun := False;    OldBufSize := FBytesInBuffer;    // Append the new received data to the read buffer    Inc(FBytesInBuffer, BytesRead);    ReallocMem(RealBuffer, FBytesInBuffer);    Move(NewData, RealBuffer[OldBufSize], BytesRead);    {Process all potential lines in the current buffer. Attention: FBuffer and     FBytesInBuffer MUST be updated for each line, as they can be accessed from     within the FOnLine handler!}    LastEndOfLine := 0;    if OldBufSize > 0 then      i := OldBufSize - 1    else      i := 0;    CurBytesInBuffer := FBytesInBuffer;    while i <= CurBytesInBuffer - 1 do    begin      if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then      begin        LineLength := i - LastEndOfLine;	SetLength(line, LineLength);	if LineLength > 0 then	  Move(RealBuffer[LastEndOfLine], line[1], LineLength);	if (i < CurBytesInBuffer - 1) and (RealBuffer[i] = #13) and	  (RealBuffer[i + 1] = #10) then	  Inc(i);	LastEndOfLine := i + 1;	if Assigned(FOnLine) then	begin	  FBuffer := RealBuffer + LastEndOfLine;	  FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;	  FDestroyedFlag := @DestroyedFlag;	  FOnLine(line);	  FDestroyedFlag := nil;	  if DestroyedFlag then	    exit;	end;      end;      Inc(i);    end;    FBytesInBuffer := CurBytesInBuffer;    if LastEndOfLine > 0 then    begin      // Remove all processed lines from the buffer      Dec(FBytesInBuffer, LastEndOfLine);      GetMem(p, FBytesInBuffer);      Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);      FreeMem(RealBuffer);      RealBuffer := p;    end;    FBuffer := RealBuffer;  end;end;// -------------------------------------------------------------------//   TAsyncStreamLineReader// -------------------------------------------------------------------function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;begin  Result := FDataStream.Read(ABuffer, count);end;procedure TAsyncStreamLineReader.NoData;var  s: String;begin  if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then  begin    if (FBytesInBuffer > 0) and Assigned(FOnLine) then    begin      if FBuffer[FBytesInBuffer - 1] in [#13, #10] then        Dec(FBytesInBuffer);      SetLength(s, FBytesInBuffer);      Move(FBuffer^, s[1], FBytesInBuffer);      FOnLine(s);    end;    FManager.ClearReadHandler(FBlockingStream.Handle);    if Assigned(FOnEOF) then      FOnEOF(Self);  end;end;procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);begin  Run;end;constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager; AStream: THandleStream);begin  Self.Create(AManager, AStream, AStream);end;constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager;  ADataStream: TStream; ABlockingStream: THandleStream);begin  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));  inherited Create;  FManager := AManager;  FDataStream := ADataStream;  FBlockingStream := ABlockingStream;  AManager.SetReadHandler(FBlockingStream.Handle, @StreamDataAvailable, nil);  AManager.HandleAsync[FBlockingStream.Handle] := True;end;destructor TAsyncStreamLineReader.Destroy;var  Handler: TMethod;begin  Handler := TMethod(FManager.GetReadHandler(FBlockingStream.Handle));  if (Handler.Code = Pointer(@StreamDataAvailable)) and    (Handler.Data = Pointer(Self)) then    FManager.ClearReadHandler(FBlockingStream.Handle);  inherited Destroy;end;// -------------------------------------------------------------------//   TWriteBuffer// -------------------------------------------------------------------procedure TWriteBuffer.BufferEmpty;begin  if Assigned(FOnBufferEmpty) then    FOnBufferEmpty(Self);end;constructor TWriteBuffer.Create;begin  inherited Create;  FBuffer := nil;  FBytesInBuffer := 0;  EndOfLineMarker := #10;end;destructor TWriteBuffer.Destroy;begin  if Assigned(FBuffer) then    FreeMem(FBuffer);  inherited Destroy;end;function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;begin  if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or     ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then    Result := FBytesInBuffer  else    raise EStreamError.Create('Invalid stream operation');end;function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;begin  ReallocMem(FBuffer, FBytesInBuffer + Count);  Move(ABuffer, FBuffer[FBytesInBuffer], Count);  Inc(FBytesInBuffer, Count);  WantWrite;  Result := Count;end;procedure TWriteBuffer.WriteLine(const line: String);var  s: String;begin  s := line + EndOfLineMarker;  WriteBuffer(s[1], Length(s));end;procedure TWriteBuffer.Run;var  CurStart, written: Integer;  NewBuf: PChar;  failed: Boolean;begin  CurStart := 0;  failed := True;  repeat    if FBytesInBuffer = 0 then begin      BufferEmpty;      exit;    end;    written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);    if written > 0 then begin      Inc(CurStart, written);      failed := False;      GetMem(NewBuf, FBytesInBuffer - CurStart);      Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);      FreeMem(FBuffer);      FBuffer := NewBuf;      Dec(FBytesInBuffer, CurStart);    end;  until written <= 0;  if failed then    WritingFailed;end;// -------------------------------------------------------------------//   TAsyncWriteStream// -------------------------------------------------------------------function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;begin  Result := FDataStream.Write(ABuffer, count);end;procedure TAsyncWriteStream.WritingFailed;begin  if FDataStream <> FBlockingStream then    FManager.ClearWriteHandler(FBlockingStream.Handle);end;procedure TAsyncWriteStream.WantWrite;begin  FManager.SetWriteHandler(FBlockingStream.Handle, @CanWrite, nil);end;procedure TAsyncWriteStream.BufferEmpty;begin  FManager.ClearWriteHandler(FBlockingStream.Handle);  inherited BufferEmpty;end;procedure TAsyncWriteStream.CanWrite(UserData: TObject);begin  Run;end;constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager; AStream: THandleStream);begin  Self.Create(AManager, AStream, AStream);end;constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager;  ADataStream: TStream; ABlockingStream: THandleStream);begin  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));  inherited Create;  FManager := AManager;  FDataStream := ADataStream;  FBlockingStream := ABlockingStream;  AManager.HandleAsync[FBlockingStream.Handle] := True;end;destructor TAsyncWriteStream.Destroy;begin  FManager.ClearWriteHandler(FBlockingStream.Handle);  inherited Destroy;end;end.{  $Log$  Revision 1.3  2000-08-24 22:29:37  sg  * Line reader now reports a read line after a single #10 at the end of the    current input buffer  Revision 1.2  2000/07/13 11:32:58  michael  + removed logs }
 |