Browse Source

implement TProxyAggregateStream (Delphi compatibility)

Ondrej Pokorny 1 year ago
parent
commit
ee9c54d51a

+ 16 - 272
packages/fcl-base/src/chainstream.pp

@@ -31,12 +31,6 @@ uses
 {$ENDIF FPC_DOTTEDUNITS}
 {$ENDIF FPC_DOTTEDUNITS}
 
 
 Type
 Type
-  TChainedStreamItem = record
-    Stream : TStream;
-    Size : Int64;
-  end;
-  TChainedStreamArray = Array of TChainedStreamItem;
-
   { TChainedStream }
   { TChainedStream }
 
 
   // Stream, backed by several other streams.
   // Stream, backed by several other streams.
@@ -44,295 +38,45 @@ Type
   // When writing, the current size of the streams is kept.
   // When writing, the current size of the streams is kept.
   // i.e. the write operation overflows to the next stream, if any.
   // i.e. the write operation overflows to the next stream, if any.
 
 
-  TChainedStream = class(TStream)
-    FStreams : TChainedStreamArray;
-    FPosition : Int64;
-    FCurrentStreamIdx : Integer;
+  TChainedStream = class(TProxyAggregateStream)
   private
   private
     FOwnsStreams: Boolean;
     FOwnsStreams: Boolean;
-    function GetStream(aIndex : Integer): TStream;
     function GetStreamCount: Integer;
     function GetStreamCount: Integer;
-    function IsValidStreamIndex(aIndex: Integer): Boolean;
-  Protected
-    Function CurrentStream : TStream;
-    Function StreamSize : Int64;
-    Function NextStream : Boolean;
-    Function PrevStream : Boolean;
-    Function GetTotalSize : Int64;
-    function  GetSize: Int64; virtual;
+    procedure SetOwnsStreams(const aOwnsStreams: Boolean);
   Public
   Public
-    Constructor Create(aChain : Array of TStream; OwnsStreams : Boolean = False);
-    Destructor Destroy; override;
-    function Read(var Buffer; Count: Longint): Longint; override;
-    function Write(const Buffer; Count: Longint): Longint; override;
-    function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override;
+    Constructor Create(aChain : Array of TStream; aOwnsStreams : Boolean = False);
     property StreamCount : Integer Read GetStreamCount;
     property StreamCount : Integer Read GetStreamCount;
-    property Streams[aIndex : Integer] : TStream Read GetStream;
-    Property OwnsStreams : Boolean Read FOwnsStreams Write FOwnsStreams;
+    Property OwnsStreams : Boolean Read FOwnsStreams Write SetOwnsStreams;
   end;
   end;
 
 
 implementation
 implementation
 
 
-{$IFDEF FPC_DOTTEDUNITS}
-uses System.RtlConsts;
-{$ELSE FPC_DOTTEDUNITS}
-uses rtlconsts;
-{$ENDIF FPC_DOTTEDUNITS}
-
 { TChainedStream }
 { TChainedStream }
 
 
 function TChainedStream.GetStreamCount: Integer;
 function TChainedStream.GetStreamCount: Integer;
 begin
 begin
-  Result:=Length(FStreams);
-end;
-
-function TChainedStream.IsValidStreamIndex(aIndex : Integer) : Boolean;
-
-begin
-  Result:=(aIndex>=0) and (aIndex<Length(FStreams));
-end;
-
-function TChainedStream.GetStream(aIndex : Integer): TStream;
-begin
-  if not IsValidStreamIndex(aIndex) then
-    Raise EListError.CreateFmt(SListIndexError,[aIndex]);
-  Result:=FStreams[aIndex].Stream;
-end;
-
-function TChainedStream.CurrentStream: TStream;
-begin
-  if IsValidStreamIndex(FCurrentStreamIdx) then
-    Result:=FStreams[FCurrentStreamIdx].Stream
-  else
-    Result:=Nil;
+  Result:=Count;
 end;
 end;
 
 
-function TChainedStream.StreamSize: Int64;
-begin
-  if IsValidStreamIndex(FCurrentStreamIdx) then
-    begin
-    if FStreams[FCurrentStreamIdx].Size=-1 then
-      FStreams[FCurrentStreamIdx].Size:=FStreams[FCurrentStreamIdx].Stream.Size;
-    Result:=FStreams[FCurrentStreamIdx].Size;
-    end
-  else
-    Result:=0;
-end;
-
-function TChainedStream.NextStream: Boolean;
-begin
-  Inc(FCurrentStreamIdx);
-  Result:=IsValidStreamIndex(FCurrentStreamIdx);
-end;
-
-function TChainedStream.PrevStream: Boolean;
-begin
-  Dec(FCurrentStreamIdx);
-  Result:=IsValidStreamIndex(FCurrentStreamIdx);
-end;
-
-function TChainedStream.GetTotalSize: Int64;
-
+procedure TChainedStream.SetOwnsStreams(const aOwnsStreams: Boolean);
 var
 var
-  aCurrent: Integer;
-
-begin
-  Result:=0;
-  aCurrent:=FCurrentStreamIdx;
-  try
-    FCurrentStreamIdx:=0;
-    While CurrentStream<>Nil do
-      begin
-      Result:=Result+StreamSize;
-      NextStream;
-      end;
-  finally
-    FCurrentStreamIdx:=aCurrent;
-  end;
-end;
-
-function TChainedStream.GetSize: Int64;
-begin
-  Result:=GetTotalSize;
-end;
-
-constructor TChainedStream.Create(aChain: array of TStream; OwnsStreams: Boolean);
-
-Var
-  I : Integer;
-
-begin
-  SetLength(FStreams,Length(aChain));
-  For I:=0 to Length(aChain)-1 do
-    begin
-    FStreams[i].Stream:=aChain[i];
-    FStreams[i].Size:=-1;
-    end;
-  FCurrentStreamIdx:=0;
-end;
-
-
-destructor TChainedStream.Destroy;
-
-Var
-  I : Integer;
-
+  I: Integer;
 begin
 begin
-  If OwnsStreams then
-    For I:=0 to Length(FStreams) do
-      FreeAndNil(FStreams[i].Stream);
-  inherited Destroy;
+  FOwnsStreams := aOwnsStreams;
+  for I := 0 to Count-1 do
+    OwnsStream[I] := FOwnsStreams;
 end;
 end;
 
 
-function TChainedStream.Read(var Buffer; Count: Longint): Longint;
-
+constructor TChainedStream.Create(aChain: array of TStream; aOwnsStreams: Boolean);
 Var
 Var
-  aRead : Integer;
-  P : PByte;
-
-begin
-  Result:=0;
-  P:=@Buffer;
-  While (Count>0) and Assigned(CurrentStream) do
-    begin
-    aRead:=CurrentStream.Read(P^, Count);
-    Inc(P,aRead);
-    Dec(Count,aRead);
-    Inc(Result,aRead);
-    Inc(FPosition,aRead);
-    if Count>0 then
-      if NextStream then
-        CurrentStream.Position:=0
-      else
-        break;
-    end;
-end;
+  S: TStream;
 
 
-function TChainedStream.Write(const Buffer; Count: Longint): Longint;
-
-Var
-  aBufAvail,aToWrite,aWritten : Integer;
-  P : PByte;
-  
 begin
 begin
-  Result:=0;
-  P:=@Buffer;
-  While (Count>0) and Assigned(CurrentStream) do
-    begin
-    aBufAvail:=StreamSize-CurrentStream.Position;
-    aToWrite:=Count;
-    if aToWrite>aBufAvail then
-      aToWrite:=aBufAvail;
-    if aToWrite>0 then
-      begin
-      aWritten:=CurrentStream.Write(P^, aToWrite);
-      Inc(P,aWritten);
-      Dec(Count,aWritten);
-      Inc(Result,aWritten);
-      Inc(FPosition,aWritten);
-      end;
-    if (Count>0) then
-      if NextStream then
-        CurrentStream.Position:=0
-      else
-        break;
-    end;
-end;
-
-function TChainedStream.Seek(const Offset: Int64; Origin: TSeekOrigin): Int64;
-
-Var
-  aOff : Int64;
-
-  Procedure MoveForward(aStartPos : Int64; aOrigin : TSeekOrigin);
-
-  var
-    aSize : Int64;
+  inherited Create;
 
 
-  begin
-    aSize:=StreamSize;
-    while (aOff>aSize-aStartPos) do
-      begin
-      Dec(aOff,aSize-aStartPos);
-      Inc(FPosition,aSize-aStartPos);
-      if not NextStream then
-        Break;
-      aStartPos:=0;
-      aSize:=StreamSize;
-      end;
-    if CurrentStream=Nil then
-      FCurrentStreamIdx:=Length(FStreams)-1;
-    aSize:=StreamSize;
-    if aOff>aSize then
-       aOff:=aSize;
-    inc(FPosition,aOff);
-    Result:=FPosition;
-    CurrentStream.Seek(aOff,aOrigin);
-  end;
-
-  Procedure MoveBackward(aStartSize : Int64; aOrigin : TSeekOrigin);
-
-  var
-    aSize : Int64;
-
-  begin
-    aOff:=Abs(aOff);
-    aSize:=aStartSize;
-    while (aOff>aSize) do
-      begin
-      Dec(aOff,aSize);
-      Dec(FPosition,aSize);
-      if not PrevStream then
-        Break
-      else
-        begin
-        aSize:=StreamSize;
-        CurrentStream.Seek(0,soEnd);
-        end;
-      end;
-    if CurrentStream=Nil then
-      FCurrentStreamIdx:=0;
-    if aOff>aSize then
-      aOff:=aSize;
-    Dec(FPosition,aOff);
-    Result:=FPosition;
-    if (aOrigin=soCurrent) and (aStartSize<>StreamSize) then
-      CurrentStream.Seek(-aOff,soCurrent)
-    else
-      CurrentStream.Seek(-aOff,soEnd);
-  end;
-
-begin
-  if (Offset=0) and (Origin=soCurrent) then
-    Exit(FPosition);
-  aOff:=Offset;
-  Case origin of
-    soBeginning :
-      begin
-      FCurrentStreamIdx:=0;
-      FPosition:=0;
-      if aOff<0 then
-        exit(FPosition);
-      MoveForward(0,soBeginning);
-      end;
-    soCurrent :
-      begin
-      if aOff>0 then
-        begin
-        MoveForward(Currentstream.Position,soCurrent);
-        end
-      else
-        begin
-        MoveBackward(CurrentStream.Position,soCurrent)
-        end;
-      end;
-    soEnd:
-      begin
-      FCurrentStreamIdx:=Length(FStreams)-1;
-      FPosition:=GetTotalSize;
-      MoveBackward(StreamSize,SoEnd)
-      end;
-  end;
+  FOwnsStreams := aOwnsStreams;
+  for S in aChain do
+    AddStream(S, aOwnsStreams);
 end;
 end;
 
 
 end.
 end.

+ 40 - 0
rtl/objpas/classes/classesh.inc

@@ -1608,6 +1608,46 @@ type
   end;
   end;
 {$endif FPC_OS_UNICODE}
 {$endif FPC_OS_UNICODE}
 
 
+{ TProxyAggregateStream }
+
+  TProxyAggregateStream = class(TStream)
+  private type
+    TStreamEntry = record
+      Stream: TStream;
+      OwnsStream: Boolean;
+    end;
+  private
+    FStreams: array of TStreamEntry;
+    FCurrentStream: Integer;
+    FCurrentStreamPos: Int64;
+    FSize: Int64;
+    FPosition: Int64;
+    function GetCount: Integer;
+    function GetOwnsStream(AIndex: Integer): Boolean;
+    function GetStreams(AIndex: Integer): TStream;
+    procedure SetOwnsStream(AIndex: Integer; const aOwnsStream: Boolean);
+  protected
+    procedure SyncPosition;
+    function GetPosition: Int64; override;
+    function GetSize: Int64; override;
+  public
+    constructor Create;
+    destructor Destroy; override;
+    function Read(var Buffer; ACount: Longint): Longint; override;
+    function Write(const Buffer; ACount: Longint): Longint; override;
+    function Seek(Offset: Longint; Origin: Word): Longint; override; overload;
+    function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override; overload;
+
+    function AddStream(AStream: TStream; AOwnsStream: Boolean = False): Integer;
+    procedure RemoveStream(AStream: TStream); overload;
+    procedure RemoveStream(AIndex: Integer); overload;
+    procedure Clear;
+
+    property Count: Integer read GetCount;
+    property Streams[AIndex: Integer]: TStream read GetStreams;
+    property OwnsStream[AIndex: Integer]: Boolean read GetOwnsStream write SetOwnsStream;
+  end;
+
 { TStreamAdapter }
 { TStreamAdapter }
 
 
   TStreamOwnership = (soReference, soOwned);
   TStreamOwnership = (soReference, soOwned);

+ 208 - 0
rtl/objpas/classes/streams.inc

@@ -1918,6 +1918,214 @@ destructor TResourceStream.Destroy;
     inherited destroy;
     inherited destroy;
   end;
   end;
 
 
+{****************************************************************************}
+{*                        TProxyAggregateStream                             *}
+{****************************************************************************}
+
+{ TProxyAggregateStream }
+
+function TProxyAggregateStream.AddStream(AStream: TStream; AOwnsStream: Boolean): Integer;
+begin
+  try
+    AStream.Position := 0;
+    Inc(FSize, AStream.Size);
+
+    SetLength(FStreams, Length(FStreams)+1);
+    Result := High(FStreams);
+    FStreams[Result].Stream := AStream;
+    FStreams[Result].OwnsStream := AOwnsStream;
+  except
+    if AOwnsStream then
+      AStream.Free;
+    raise;
+  end;
+end;
+
+procedure TProxyAggregateStream.Clear;
+var
+  I: Integer;
+begin
+  FSize := 0;
+  for I := 0 to High(FStreams) do
+    if FStreams[I].OwnsStream then
+      FStreams[I].Stream.Free;
+  FStreams := nil;
+  FPosition := 0;
+  FCurrentStream := -1;
+end;
+
+constructor TProxyAggregateStream.Create;
+begin
+  inherited Create;
+
+  FCurrentStream := -1;
+end;
+
+destructor TProxyAggregateStream.Destroy;
+begin
+  Clear;
+
+  inherited Destroy;
+end;
+
+function TProxyAggregateStream.GetCount: Integer;
+begin
+  Result := Length(FStreams);
+end;
+
+function TProxyAggregateStream.GetOwnsStream(AIndex: Integer): Boolean;
+begin
+  Result := FStreams[AIndex].OwnsStream;
+end;
+
+function TProxyAggregateStream.GetPosition: Int64;
+begin
+  Result := FPosition;
+end;
+
+function TProxyAggregateStream.GetSize: Int64;
+begin
+  Result := FSize;
+end;
+
+function TProxyAggregateStream.GetStreams(AIndex: Integer): TStream;
+begin
+  Result := FStreams[AIndex].Stream;
+end;
+
+function TProxyAggregateStream.Read(var Buffer; ACount: Longint): Longint;
+var
+  P: PByte;
+  LRemain, LStreamRead, LStreamRemain, LStreamSize: Int64;
+begin
+  if (FCurrentStream=-1) or (
+           (FCurrentStream<Self.Count)
+       and (FCurrentStreamPos<>FStreams[FCurrentStream].Stream.Position))
+  then
+    SyncPosition;
+
+  Result := 0;
+  if (FPosition=FSize) or (ACount=0) then
+    Exit;
+  P := @Buffer;
+  while (Result<ACount) and (FCurrentStream<Self.Count) do
+  begin
+    LRemain := Int64(ACount)-Int64(Result);
+    LStreamSize := FStreams[FCurrentStream].Stream.Size;
+    LStreamRemain := LStreamSize-FStreams[FCurrentStream].Stream.Position;
+    if LRemain<LStreamRemain then
+      LStreamRead := LRemain
+    else
+      LStreamRead := LStreamRemain;
+    LStreamRead := FStreams[FCurrentStream].Stream.Read(P[Result], LStreamRead);
+    FCurrentStreamPos := FStreams[FCurrentStream].Stream.Position;
+    Inc(FPosition, LStreamRead);
+    Inc(Result, LStreamRead);
+    if (Result<ACount) and (FCurrentStreamPos>=LStreamSize) then
+    begin
+      Inc(FCurrentStream);
+      if FCurrentStream<Self.Count then
+      begin
+        FStreams[FCurrentStream].Stream.Position := 0;
+        FCurrentStreamPos := 0;
+      end;
+    end;
+  end;
+end;
+
+procedure TProxyAggregateStream.RemoveStream(AIndex: Integer);
+begin
+  Dec(FSize, FStreams[AIndex].Stream.Size);
+  if FStreams[AIndex].OwnsStream then
+    FStreams[AIndex].Stream.Free;
+  Delete(FStreams, AIndex, 1);
+  FPosition := 0;
+  FCurrentStream := -1;
+end;
+
+procedure TProxyAggregateStream.RemoveStream(AStream: TStream);
+var
+  I: Integer;
+begin
+  for I := 0 to High(FStreams) do
+    if FStreams[I].Stream=AStream then
+    begin
+      RemoveStream(I);
+      Exit;
+    end;
+end;
+
+function TProxyAggregateStream.Seek(const Offset: Int64; Origin: TSeekOrigin): Int64;
+var
+  LNewPos, LPrevPos: Int64;
+begin
+  LPrevPos := FPosition;
+  case Origin of
+    soBeginning: LNewPos := Offset;
+    soCurrent: LNewPos := FPosition + Offset;
+    soEnd: LNewPos := FSize + Offset;
+  end;
+  if LNewPos <= 0 then
+    FPosition := 0
+  else
+  if LNewPos > Size then
+    FPosition := Size
+  else
+    FPosition := LNewPos;
+  if LPrevPos <> FPosition then
+    FCurrentStream := -1; // we need SyncPosition
+  Result := FPosition;
+end;
+
+function TProxyAggregateStream.Seek(Offset: Longint; Origin: Word): Longint;
+begin
+  Result := Seek(Int64(Offset), TSeekOrigin(Origin)); // call Int64-Seek
+end;
+
+procedure TProxyAggregateStream.SetOwnsStream(AIndex: Integer; const aOwnsStream: Boolean);
+begin
+  FStreams[AIndex].OwnsStream := aOwnsStream;
+end;
+
+procedure TProxyAggregateStream.SyncPosition;
+  procedure _GoToEnd;
+  begin
+    FCurrentStream := Count-1;
+    if FCurrentStream>=0 then
+      FCurrentStreamPos := FStreams[FCurrentStream].Stream.Seek(0, soEnd);
+  end;
+var
+  LPosition, LStreamSize: Int64;
+  I: Integer;
+begin
+  if FPosition>=FSize then
+    _GoToEnd
+  else
+  begin
+    LPosition := 0;
+    for I := 0 to High(FStreams) do
+    begin
+      LStreamSize := FStreams[I].Stream.Size;
+      if FPosition<LPosition+LStreamSize then
+      begin
+        FCurrentStream := I;
+        FCurrentStreamPos := FStreams[FCurrentStream].Stream.Seek(FPosition-LPosition, soBeginning);
+        Exit;
+      end;
+      Inc(LPosition, LStreamSize);
+    end;
+  end;
+
+  // FPosition outside the size
+  _GoToEnd;
+end;
+
+function TProxyAggregateStream.Write(const Buffer; ACount: Longint): Longint;
+begin
+  Result := 0;
+  raise EStreamError.CreateRes(@SCantWriteAggregateStreamError);
+end;
+
 {****************************************************************************}
 {****************************************************************************}
 {*                             TOwnerStream                                 *}
 {*                             TOwnerStream                                 *}
 {****************************************************************************}
 {****************************************************************************}

+ 1 - 0
rtl/objpas/rtlconst.inc

@@ -282,6 +282,7 @@ ResourceString
   SStreamNoReading              = 'Reading from %s is not supported';
   SStreamNoReading              = 'Reading from %s is not supported';
   SStreamNoWriting              = 'Writing to %s is not supported';
   SStreamNoWriting              = 'Writing to %s is not supported';
   SStreamSetSize                = 'Error setting stream size';
   SStreamSetSize                = 'Error setting stream size';
+  SCantWriteAggregateStreamError= 'Can''t write to a read-only aggregate stream';
   SStringExpected               = 'String expected';
   SStringExpected               = 'String expected';
   SSymbolExpected               = '%s expected';
   SSymbolExpected               = '%s expected';
   SThreadCreateError            = 'Thread creation error: %s';
   SThreadCreateError            = 'Thread creation error: %s';