소스 검색

Core: Finished TPipelineQueue
- bug fixes
- merged into UThread
- added unit tests

Herman Schoenfeld 6 년 전
부모
커밋
d7b2fbea0f

+ 0 - 284
src/core/UPipeline.pas

@@ -1,284 +0,0 @@
-unit UPipeline;
-
-{ Copyright (c) 2019 by Herman Schoenfeld
-
-  Distributed under the MIT software license, see the accompanying file LICENSE
-  or visit http://www.opensource.org/licenses/mit-license.php.
-
-  This unit is a part of the PascalCoin Project, an infinitely scalable
-  cryptocurrency. Find us here:
-  Web: https://www.pascalcoin.org
-  Source: https://github.com/PascalCoin/PascalCoin
-
-  THIS LICENSE HEADER MUST NOT BE REMOVED.
-}
-
-{$IFDEF FPC}
-  {$MODE Delphi}
-{$ENDIF}
-
-interface
-
-uses
-  {$IFNDEF FPC}System.Generics.Collections{$ELSE}Generics.Collections{$ENDIF}, Classes, SyncObjs, SysUtils, UThread;
-
-Type
-  { TPipelineQueue }
-
-  TPipelineQueue<T> = class(TComponent)
-  private type
-    __TArray_T = TArray<T>;
-
-    { TStageQueue }
-
-    TStageQueue = class
-    private
-      FDirty : Boolean;
-      FLock : TMultiReadExclusiveWriteSynchronizer;
-      FItems : TList<T>;
-
-      function GetDirty : Boolean;
-      procedure SetDirty (AValue : Boolean);
-      function GetItems : TArray<T>;
-    public
-      constructor Create; overload;
-      destructor Destroy; override;
-      property Dirty : Boolean read GetDirty write SetDirty;
-      property Lock : TMultiReadExclusiveWriteSynchronizer read FLock;
-      property Items : TArray<T> read GetItems;
-    end;
-
-    { TErrorResult }
-
-    TErrorResult = record
-     Item : T;
-     ErrorMessage : String;
-    end;
-
-    { TPipelineWorkerThread}
-
-    TPipelineWorkerThread = class(TPCThread)
-    private
-      FPipeline : TPipelineQueue<T>;
-      FStage : Integer;
-    protected
-      procedure BCExecute; override;
-    public
-      constructor Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer); overload;
-    end;
-
-  private
-    FQueues : TArray<TStageQueue>;
-    FMaxWorkerThreads : Integer;
-    FActiveWorkerThreads : Integer;
-    {$IFDEF UNITTESTS}
-    FHistoricalMaxActiveWorkerThreads : Integer;
-    {$ENDIF}
-    procedure Initialize(AStageCount : Integer; AMaxWorkerThreadCount : Integer);
-    procedure Enqueue(AStage : Integer; const AItem : T); overload;
-    procedure EnqueueRange(AStage : Integer; const AItems : array of T); overload;
-    procedure NotifyPipelineAppended(AStage : Integer);
-    function GetStageCount : Integer; inline;
-  protected
-    function ProcessStage(AStageNum : Integer; const AItems : TArray<T>; out TErrors : TArray<TErrorResult>) : TArray<T>; virtual; abstract;
-    procedure HandleErrorItems(const AErrorItems : array of TErrorResult); virtual; abstract;
-    procedure HandleFinishedItems(const AItems : array of T); virtual; abstract;
-
-  public
-    property StageCount : Integer read GetStageCount;
-    {$IFDEF UNITTESTS}
-    property HistoricalMaxActiveWorkerThreads : Integer read FHistoricalMaxActiveWorkerThreads;
-    {$ENDIF}
-    constructor Create(AOwner : TComponent; AStageCount, AMaxWorkerThreads : Integer); overload;
-    destructor Destroy; override;
-    procedure Enqueue(const AItem : T); overload;
-    procedure EnqueueRange(const AItems : array of T); overload;
-  end;
-
-implementation
-
-{ TPipelineQueue }
-
-constructor TPipelineQueue<T>.Create(AOwner : TComponent; AStageCount, AMaxWorkerThreads : Integer);
-begin
-  inherited Create(AOwner);
-  Initialize(AStageCount, AMaxWorkerThreads);
-end;
-
-destructor TPipelineQueue<T>.Destroy;
-var i : Integer;
-begin
-  inherited;
-  for i := 0 to High(FQueues) do
-    FreeAndNil(FQueues[i]);
-end;
-
-procedure TPipelineQueue<T>.Initialize(AStageCount : Integer; AMaxWorkerThreadCount : Integer);
-var i : integer;
-begin
-  if AStageCount <= 0 then raise EArgumentException.Create('AStageCount must be greater than 0');
-  if AMaxWorkerThreadCount <= 0 then raise EArgumentException.Create('AMaxWorkerThreadCount must be greater than 0');
-  FMaxWorkerThreads := AMaxWorkerThreadCount;
-  FActiveWorkerThreads := 0;
-  SetLength(FQueues, AStagecount);
-  for i := 0 to AStageCount - 1 do begin
-    FQueues[i] := TStageQueue.Create;
-  end;
-end;
-
-procedure TPipelineQueue<T>.Enqueue(AStage : Integer; const AItem : T);
-begin
-  EnqueueRange(AStage, [AItem]);
-end;
-
-procedure TPipelineQueue<T>.EnqueueRange(AStage : Integer; const AItems : array of T);
-begin
-  FQueues[AStage].Lock.BeginWrite;
-  try
-    FQueues[AStage].FDirty := True;          // Dirty accessed without lock
-    FQueues[AStage].FItems.AddRange(AItems);
-  finally
-    FQueues[AStage].Lock.EndWrite;
-  end;
-  NotifyPipelineAppended(AStage);
-end;
-
-procedure TPipelineQueue<T>.Enqueue(const AItem : T);
-begin
-  Enqueue(0, AItem);
-end;
-
-procedure TPipelineQueue<T>.EnqueueRange(const AItems : array of T);
-begin
-  EnqueueRange(0, AItems);
-end;
-
-procedure TPipelineQueue<T>.NotifyPipelineAppended(AStage : Integer);
-begin
-  if (FActiveWorkerThreads = 0) OR (FActiveWorkerThreads < FMaxWorkerThreads) then begin
-    // Start a new worker thread to process
-    TPipelineWorkerThread.Create(Self, AStage);
-    {$IFDEF UNITTESTS}
-    if (FActiveWorkerThreads > FHistoricalMaxActiveWorkerThreads) then
-      FHistoricalMaxActiveWorkerThreads := FActiveWorkerThreads;
-    {$ENDIF}
-  end;
-end;
-
-function TPipelineQueue<T>.GetStageCount : Integer;
-begin
-  Result := Length(FQueues);
-end;
-
-{ TPipelineQueue<T>.TStageQueue }
-
-constructor TPipelineQueue<T>.TStageQueue.Create;
-begin
-  inherited;
-  FDirty := False;
-  FLock := TMultiReadExclusiveWriteSynchronizer.Create;
-  FItems := TList<T>.Create;
-end;
-
-destructor TPipelineQueue<T>.TStageQueue.Destroy;
-begin
-  inherited;
-  FreeAndNil(FLock);
-  FreeAndNil(FItems);
-end;
-
-function TPipelineQueue<T>.TStageQueue.GetDirty : Boolean;
-begin
-  FLock.BeginRead;
-  try
-    Result := FDirty;
-  finally
-    FLock.EndRead;
-  end;
-end;
-
-procedure TPipelineQueue<T>.TStageQueue.SetDirty ( AValue : Boolean );
-begin
-  FLock.BeginWrite;
-  try
-    FDirty := AValue;
-  finally
-    FLock.EndWrite;
-  end;
-end;
-
-function TPipelineQueue<T>.TStageQueue.GetItems : TArray<T>;
-begin
-  begin
-    FLock.BeginRead;
-    try
-      Result := FItems.ToArray;
-    finally
-      FLock.EndRead;
-    end;
-  end;
-end;
-
-{ TPipelineQueue<T>.TPipelineWorkerThread }
-
-constructor TPipelineQueue<T>.TPipelineWorkerThread.Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer);
-begin
- inherited Create(False);
- Self.FreeOnTerminate := true;
- FPipeline := APipelineQueue;
- FStage := AStage;
- Inc(FPipeline.FActiveWorkerThreads);
-end;
-
-procedure TPipelineQueue<T>.TPipelineWorkerThread.BCExecute;
-var
-  i : Integer;
-  LHasMore : Boolean;
-  LIn : TArray<T>;
-  LOut : TArray<T>;
-  LErrorOut : TArray<TErrorResult>;
-begin
-  repeat
-    // protect against excessive worker threads
-    if FPipeline.FActiveWorkerThreads > FPipeline.FMaxWorkerThreads then exit;
-
-    // double-check ensure still dirty
-    if not FPipeline.FQueues[FStage].FDirty then exit;
-
-    // Extract items from pipeline stage
-    FPipeline.FQueues[FStage].Lock.BeginWrite;
-    try
-      LIn := FPipeline.FQueues[FStage].FItems.ToArray;
-      FPipeline.FQueues[FStage].FItems.Clear;
-      FPipeline.FQueues[FStage].FDirty := False;
-    finally
-      FPipeline.FQueues[FStage].Lock.EndWrite;
-    end;
-
-    // process items
-    LOut := FPipeline.ProcessStage(FStage, LIn, LErrorOut);
-
-    // process errors
-    if Length(LErrorOut) > 0 then
-      FPipeline.HandleErrorItems(LErrorOut);
-
-    // send output to next queue (or finish)
-    if FStage = FPipeline.StageCount - 1 then
-      FPipeline.HandleFinishedItems(LOut)
-    else begin
-      // send to next queue
-      FPipeline.EnqueueRange(FStage + 1, LOut);
-    end;
-
-    // keep working until all stages are completed
-    LHasMore := False;
-    for i := 0 to High(FPipeline.FQueues) do begin
-      if FPipeline.FQueues[i].Dirty then
-        FStage := i;
-    end;
-  until not LHasMore;
-  Dec(FPipeline.FActiveWorkerThreads);
-end;
-
-end.
-

+ 273 - 4
src/core/UThread.pas

@@ -2,6 +2,9 @@ unit UThread;
 
 { Copyright (c) 2016 by Albert Molina
 
+  Acknowledgements:
+    Herman Schoenfeld <[email protected]> author of TPipelineQueue (2019)
+
   Distributed under the MIT software license, see the accompanying file LICENSE
   or visit http://www.opensource.org/licenses/mit-license.php.
 
@@ -31,7 +34,7 @@ uses
   {$IFDEF LINUX}cthreads,{$ENDIF}
 {$ENDIF}
   {$IFNDEF FPC}System.Generics.Collections{$ELSE}Generics.Collections{$ENDIF},
-  Classes, SyncObjs, UBaseTypes;
+  Classes, SyncObjs, SysUtils, UBaseTypes;
 
 {$I config.inc}
 
@@ -96,18 +99,86 @@ Type
     procedure UnlockList; inline;
   end;
 
+  { TPipelineQueue }
+
+  TPipelineQueue<T> = class(TComponent)
+  private type
+
+    { TStageQueue }
+
+    TStageQueue = class
+    private
+      FDirty : Boolean;
+      FLock : TMultiReadExclusiveWriteSynchronizer;
+      FItems : TList<T>;
+      function GetDirty : Boolean;
+      procedure SetDirty (AValue : Boolean);
+      function GetItems : TArray<T>;
+    public
+      constructor Create; overload;
+      destructor Destroy; override;
+      property Dirty : Boolean read GetDirty write SetDirty;
+      property Lock : TMultiReadExclusiveWriteSynchronizer read FLock;
+      property Items : TArray<T> read GetItems;
+    end;
+
+    { TErrorResult }
+
+    TErrorResult = record
+     Item : T;
+     ErrorMessage : String;
+    end;
+
+    { TPipelineWorkerThread}
+
+    TPipelineWorkerThread = class(TPCThread)
+    private
+      FPipeline : TPipelineQueue<T>;
+      FStage : Integer;
+    protected
+      procedure BCExecute; override;
+    public
+      constructor Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer); overload;
+    end;
+
+  private
+    FQueues : TList<TStageQueue>;
+    FMaxWorkerThreads : Integer;
+    FActiveWorkerThreads : Integer;
+    {$IFDEF UNITTESTS}
+    FHistoricalMaxActiveWorkerThreads : Integer;
+    {$ENDIF}
+    procedure Initialize(AStageCount : Integer; AMaxWorkerThreadCount : Integer);
+    procedure Enqueue(AStage : Integer; const AItem : T); overload;
+    procedure EnqueueRange(AStage : Integer; const AItems : array of T); overload;
+    procedure NotifyPipelineAppended(AStage : Integer);
+    function GetStageCount : Integer; inline;
+    function GetHasCompleted : Boolean;
+  protected
+    function ProcessStage(AStageNum : Integer; const AItems : TArray<T>; out AErrors : TArray<TErrorResult>) : TArray<T>; virtual; abstract;
+    procedure HandleErrorItems(const AErrorItems : array of TErrorResult); virtual; abstract;
+    procedure HandleFinishedItems(const AItems : array of T); virtual; abstract;
+  public
+    property StageCount : Integer read GetStageCount;
+    property HasCompleted : Boolean read GetHasCompleted;
+    {$IFDEF UNITTESTS}
+    property HistoricalMaxActiveWorkerThreads : Integer read FHistoricalMaxActiveWorkerThreads;
+    {$ENDIF}
+    constructor Create(AOwner : TComponent; AStageCount, AMaxWorkerThreads : Integer); overload;
+    destructor Destroy; override;
+    procedure Enqueue(const AItem : T); overload;
+    procedure EnqueueRange(const AItems : array of T); overload;
+  end;
 
 implementation
 
 uses
-  SysUtils, ULog, UConst;
+  ULog, UConst;
 
 { TPCThread }
 
 Var _threads : TPCThreadList<TPCThread>;
 
-
-
 constructor TPCThread.Create(CreateSuspended: Boolean);
 begin
   inherited Create(CreateSuspended);
@@ -441,6 +512,204 @@ begin
 end;
 {$ENDIF}
 
+{ TPipelineQueue }
+
+constructor TPipelineQueue<T>.Create(AOwner : TComponent; AStageCount, AMaxWorkerThreads : Integer);
+begin
+  inherited Create(AOwner);
+  Initialize(AStageCount, AMaxWorkerThreads);
+end;
+
+destructor TPipelineQueue<T>.Destroy;
+var i : Integer;
+begin
+  inherited;
+  for i := 0 to FQueues.Count - 1 do
+    FQueues[i].Destroy;
+  FreeAndNil(FQueues);
+end;
+
+procedure TPipelineQueue<T>.Initialize(AStageCount : Integer; AMaxWorkerThreadCount : Integer);
+var i : integer;
+begin
+  if AStageCount <= 0 then raise EArgumentException.Create('AStageCount must be greater than 0');
+  if AMaxWorkerThreadCount <= 0 then raise EArgumentException.Create('AMaxWorkerThreadCount must be greater than 0');
+  FMaxWorkerThreads := AMaxWorkerThreadCount;
+  FActiveWorkerThreads := 0;
+  FQueues := TList<TStageQueue>.Create;
+  for i := 0 to AStageCount - 1 do begin
+    FQueues.Add(  TStageQueue.Create );
+  end;
+end;
+
+procedure TPipelineQueue<T>.Enqueue(AStage : Integer; const AItem : T);
+begin
+  EnqueueRange(AStage, [AItem]);
+end;
+
+procedure TPipelineQueue<T>.EnqueueRange(AStage : Integer; const AItems : array of T);
+begin
+  FQueues[AStage].Lock.BeginWrite;
+  try
+    FQueues[AStage].FDirty := True;
+    FQueues[AStage].FItems.AddRange(AItems);
+  finally
+    FQueues[AStage].Lock.EndWrite;
+  end;
+  NotifyPipelineAppended(AStage);
+end;
+
+procedure TPipelineQueue<T>.Enqueue(const AItem : T);
+begin
+  Enqueue(0, AItem);
+end;
+
+procedure TPipelineQueue<T>.EnqueueRange(const AItems : array of T);
+begin
+  EnqueueRange(0, AItems);
+end;
+
+procedure TPipelineQueue<T>.NotifyPipelineAppended(AStage : Integer);
+begin
+  if (FActiveWorkerThreads = 0) OR (FActiveWorkerThreads < FMaxWorkerThreads) then begin
+    // Start a new worker thread to process
+    TPipelineWorkerThread.Create(Self, AStage);
+    {$IFDEF UNITTESTS}
+    if (FActiveWorkerThreads > FHistoricalMaxActiveWorkerThreads) then
+      FHistoricalMaxActiveWorkerThreads := FActiveWorkerThreads;
+    {$ENDIF}
+  end;
+end;
+
+function TPipelineQueue<T>.GetStageCount : Integer;
+begin
+  Result := FQueues.Count;
+end;
+
+function TPipelineQueue<T>.GetHasCompleted : Boolean;
+var i : Integer;
+begin
+  if FActiveWorkerThreads > 0 then Exit(False);
+  for i := 0 to FQueues.Count - 1 do
+    if FQueues[i].Dirty then Exit(False);
+  Result := true;
+end;
+
+{ TPipelineQueue<T>.TStageQueue }
+
+constructor TPipelineQueue<T>.TStageQueue.Create;
+begin
+  inherited;
+  FDirty := False;
+  FLock := TMultiReadExclusiveWriteSynchronizer.Create;
+  FItems := TList<T>.Create;
+end;
+
+destructor TPipelineQueue<T>.TStageQueue.Destroy;
+begin
+  inherited;
+  FreeAndNil(FLock);
+  FreeAndNil(FItems);
+end;
+
+function TPipelineQueue<T>.TStageQueue.GetDirty : Boolean;
+begin
+  FLock.BeginRead;
+  try
+    Result := FDirty;
+  finally
+    FLock.EndRead;
+  end;
+end;
+
+procedure TPipelineQueue<T>.TStageQueue.SetDirty ( AValue : Boolean );
+begin
+  FLock.BeginWrite;
+  try
+    FDirty := AValue;
+  finally
+    FLock.EndWrite;
+  end;
+end;
+
+function TPipelineQueue<T>.TStageQueue.GetItems : TArray<T>;
+begin
+  begin
+    FLock.BeginRead;
+    try
+      Result := FItems.ToArray;
+    finally
+      FLock.EndRead;
+    end;
+  end;
+end;
+
+{ TPipelineQueue<T>.TPipelineWorkerThread }
+
+constructor TPipelineQueue<T>.TPipelineWorkerThread.Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer);
+begin
+ inherited Create(True);
+ Self.FreeOnTerminate := true;
+ FPipeline := APipelineQueue;
+ FStage := AStage;
+ Inc(FPipeline.FActiveWorkerThreads);
+ Self.Start;
+end;
+
+procedure TPipelineQueue<T>.TPipelineWorkerThread.BCExecute;
+var
+  i, j : Integer;
+  LHasMore : Boolean;
+  LIn : TArray<T>;
+  LOut : TArray<T>;
+  LErrorOut : TArray<TErrorResult>;
+begin
+  try
+    repeat
+      // protect against excessive worker threads
+      if FPipeline.FActiveWorkerThreads > FPipeline.FMaxWorkerThreads then exit;
+
+      // double-check ensure still dirty
+      if not FPipeline.FQueues[FStage].FDirty then exit;
+
+      // Extract items from pipeline stage
+      FPipeline.FQueues[FStage].Lock.BeginWrite;
+      try
+        LIn := FPipeline.FQueues[FStage].FItems.ToArray;
+        FPipeline.FQueues[FStage].FItems.Clear;
+        FPipeline.FQueues[FStage].FDirty := False;
+      finally
+        FPipeline.FQueues[FStage].Lock.EndWrite;
+      end;
+
+      // process items
+      LOut := FPipeline.ProcessStage(FStage, LIn, LErrorOut);
+
+      // process errors
+      if Length(LErrorOut) > 0 then
+        FPipeline.HandleErrorItems(LErrorOut);
+
+      // send output to next queue (or finish)
+      if FStage < FPipeline.StageCount - 1 then
+        FPipeline.EnqueueRange(FStage + 1, LOut)
+      else
+        FPipeline.HandleFinishedItems(LOut);
+
+      // keep working until all stages are completed
+      LHasMore := False;
+      for i := 0 to FPipeline.FQueues.Count - 1 do begin
+        if FPipeline.FQueues[i].Dirty then begin
+          FStage := i;
+          LHasMore := True;
+          break;
+        end;
+      end;
+    until not LHasMore;
+  finally
+    Dec(FPipeline.FActiveWorkerThreads);
+  end;
+end;
+
 initialization
   _threads := TPCThreadList<TPCThread>.Create('GLOBAL_THREADS');
 finalization

+ 13 - 13
src/libraries/sphere10/UCommon.pas

@@ -42,10 +42,10 @@ const
 
 { GLOBAL HELPER FUNCTIONS }
 
-{function String2Hex(const Buffer: AnsiString): AnsiString;
-function Hex2Bytes(const AHexString: AnsiString): TBytes; overload;
-function TryHex2Bytes(const AHexString: AnsiString; out ABytes : TBytes): boolean; overload;
-function Bytes2Hex(const ABytes: TBytes; AUsePrefix : boolean = false) : AnsiString;}
+function String2Hex(const Buffer: String): String;
+function Hex2Bytes(const AHexString: String): TBytes; overload;
+function TryHex2Bytes(const AHexString: String; out ABytes : TBytes): boolean; overload;
+function Bytes2Hex(const ABytes: TBytes; AUsePrefix : boolean = false) : String;
 function BinStrComp(const Str1, Str2 : String): Integer; // Binary-safe StrComp replacement. StrComp will return 0 for when str1 and str2 both start with NUL character.
 function BytesCompare(const ABytes1, ABytes2: TBytes): integer;
 function BytesEqual(const ABytes1, ABytes2 : TBytes) : boolean; inline;
@@ -371,7 +371,7 @@ type
 
   TFileStreamHelper = class helper for TFileStream
     {$IFNDEF FPC}
-    procedure WriteAnsiString(const AString : String);
+    procedure WriteString(const AString : String);
     {$ENDIF}
   end;
 
@@ -433,7 +433,7 @@ var
 
 { Global helper functions }
 
-{function String2Hex(const Buffer: AnsiString): AnsiString;
+function String2Hex(const Buffer: String): String;
 var
   n: Integer;
 begin
@@ -442,16 +442,16 @@ begin
     Result := AnsiLowerCase(Result + IntToHex(Ord(Buffer[n]), 2));
 end;
 
-function Hex2Bytes(const AHexString: AnsiString): TBytes;
+function Hex2Bytes(const AHexString: String): TBytes;
 begin
   if NOT TryHex2Bytes(AHexString, Result) then
     raise EArgumentOutOfRangeException.Create('Invalidly formatted hexadecimal string.');
 end;
 
-function TryHex2Bytes(const AHexString: AnsiString; out ABytes : TBytes): boolean; overload;
+function TryHex2Bytes(const AHexString: String; out ABytes : TBytes): boolean; overload;
 var
   P : PAnsiChar;
-  LHexString : AnsiString;
+  LHexString : String;
   LHexIndex, LHexLength, LHexStart : Integer;
 begin
   SetLength(ABytes, 0);
@@ -479,10 +479,10 @@ begin
   Result := (LHexIndex = (LHexLength DIV 2));
 end;
 
-function Bytes2Hex(const ABytes: TBytes; AUsePrefix : boolean = false) : AnsiString;
+function Bytes2Hex(const ABytes: TBytes; AUsePrefix : boolean = false) : String;
 var
   i, LStart, LLen : Integer;
-  s : AnsiString;
+  s : String;
   b : Byte;
 begin
   LLen := System.Length(ABytes)*2;
@@ -509,7 +509,7 @@ begin
     Result[(i*2)+ LStart + 1] := s[2];
     Inc(i);
   end;
-end;      }
+end;
 
 function BinStrComp(const Str1, Str2: String): integer;
 var Str1Len, Str2Len, i : Integer;
@@ -1830,7 +1830,7 @@ end;
 
 { TFileStreamHelper }
 {$IFNDEF FPC}
-procedure TFileStreamHelper.WriteAnsiString(const AString : String);
+procedure TFileStreamHelper.WriteString(const AString : String);
 begin
    Self.WriteBuffer(Pointer(AString)^, Length(AString));
 end;

+ 0 - 1
src/pascalcoin_wallet_classic.dpr

@@ -30,7 +30,6 @@ uses
   UOpenSSL in 'core\UOpenSSL.pas',
   UOpenSSLdef in 'core\UOpenSSLdef.pas',
   UOpTransaction in 'core\UOpTransaction.pas',
-  UPipeline in 'core\UPipeline.pas',
   UPoolMinerThreads in 'core\UPoolMinerThreads.pas',
   UPoolMining in 'core\UPoolMining.pas',
   URandomHash in 'core\URandomHash.pas',

+ 0 - 1
src/pascalcoin_wallet_classic.dproj

@@ -124,7 +124,6 @@
         <DCCReference Include="core\UOpenSSL.pas"/>
         <DCCReference Include="core\UOpenSSLdef.pas"/>
         <DCCReference Include="core\UOpTransaction.pas"/>
-        <DCCReference Include="core\UPipeline.pas"/>
         <DCCReference Include="core\UPoolMinerThreads.pas"/>
         <DCCReference Include="core\UPoolMining.pas"/>
         <DCCReference Include="core\URandomHash.pas"/>

+ 4 - 15
src/tests/PascalCoinUnitTests.lpi

@@ -32,7 +32,7 @@
         <PackageName Value="FCL"/>
       </Item3>
     </RequiredPackages>
-    <Units Count="19">
+    <Units Count="17">
       <Unit0>
         <Filename Value="PascalCoinUnitTests.lpr"/>
         <IsPartOfProject Value="True"/>
@@ -101,17 +101,9 @@
         <IsPartOfProject Value="True"/>
       </Unit15>
       <Unit16>
-        <Filename Value="..\libraries\pascalcoin\UPipeline.pas"/>
+        <Filename Value="UThread.Tests.pas"/>
         <IsPartOfProject Value="True"/>
       </Unit16>
-      <Unit17>
-        <Filename Value="UPipeline.Tests.pas"/>
-        <IsPartOfProject Value="True"/>
-      </Unit17>
-      <Unit18>
-        <Filename Value="..\core\UPipeline.pas"/>
-        <IsPartOfProject Value="True"/>
-      </Unit18>
     </Units>
   </ProjectOptions>
   <CompilerOptions>
@@ -125,13 +117,10 @@
       <OtherUnitFiles Value="..\..\src;..\core;..\libraries\generics.collections;..\libraries\hashlib4pascal;..\libraries\sphere10;..\libraries\pascalcoin"/>
       <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/>
     </SearchPaths>
-    <CodeGeneration>
-      <Optimizations>
-        <OptimizationLevel Value="4"/>
-      </Optimizations>
-    </CodeGeneration>
     <Linking>
       <Options>
+        <PassLinkerOptions Value="True"/>
+        <LinkerOptions Value=" -dUseCThreads"/>
         <Win32>
           <GraphicApplication Value="True"/>
         </Win32>

+ 1 - 2
src/tests/PascalCoinUnitTests.lpr

@@ -4,8 +4,7 @@ program UPascalCoinUnitTests;
 
 uses
   Interfaces, Forms, GuiTestRunner, UCommon.Collections, URandomHashTests,
-  UCommon.Tests, UCommon.Collections.Tests, UMemory.Tests, UPipeline.Tests,
-  UPipeline;
+  UCommon.Tests, UCommon.Collections.Tests, UMemory.Tests, UThread.Tests;
 
 {$R *.res}
 

+ 0 - 31
src/tests/UPipeline.Tests.pas

@@ -1,31 +0,0 @@
-unit UPipeline.Tests;
-
-{$mode delphi}
-{$H+}
-{$modeswitch nestedprocvars}
-
-interface
-
-uses
-  Classes, SysUtils, fpcunit, testutils, testregistry,
-  UPipeline;
-
-type
-
-  TPipelineTests = class(TTestCase)
-    published
-      procedure Test1;
-  end;
-
-implementation
-
-procedure TPipelineTests.Test1;
-begin
-
-end;
-
-initialization
-  RegisterTest(TPipelineTests);
-
-end.
-

+ 440 - 0
src/tests/UThread.Tests.pas

@@ -0,0 +1,440 @@
+unit UThread.Tests;
+
+{$mode delphi}
+{$H+}
+{$modeswitch nestedprocvars}
+
+interface
+
+uses
+  Classes, SysUtils, fpcunit, testutils, testregistry,
+  UThread, Generics.Collections;
+
+type
+
+  TTestRecord = record
+    Number : Integer;
+  end;
+
+  TSimplePipeline = class(TPipelineQueue<TTestRecord>)
+  private
+   FSucceeded : TList<TTestRecord>;
+   FErrors : TList<TErrorResult>;
+   function StageHandler(const AItems : TArray<TTestRecord>; out AErrors : TArray<TErrorResult>) : TArray<TTestRecord>;
+  protected
+    function ProcessStage(AStageNum : Integer; const AItems : TArray<TTestRecord>; out AErrors : TArray<TErrorResult>) : TArray<TTestRecord>; override;
+    procedure HandleErrorItems(const AErrorItems : array of TErrorResult); override;
+    procedure HandleFinishedItems(const AItems : array of TTestRecord); override;
+  public
+    property Succeeded : TList<TTestRecord> read FSucceeded;
+    property Errors : TList<TErrorResult> read FErrors;
+    constructor Create(AOwner : TComponent; AStageCount, AMaxWorkerThreads : Integer); overload;
+    destructor Destroy; overload;
+  end;
+
+  TMempoolOperation = record
+    Sender : String;
+    Operation : String; // in production TPCOperationResume
+    CurrentSignerNOperation : UInt32;
+    SignerKey : String;
+    OtherNOperations : TArray<UInt32>;
+    OtherKeys : TArray<String>;
+  end;
+
+  TMempoolMock = class(TPipelineQueue<TMempoolOperation>)
+  private const
+    EXTRACT_ACCOUNT_DATA_LAG_MS = 2;
+    VERIFY_SIGNATURE_LAG = 5;
+    SEND_OPERATION_LAG = 5;
+    LOG_OPERATION_LAG = 1;
+    ACCEPT_OPERATION_LAG = 1;
+
+    ILLEGAL_OP_RATE = 0.05; // percent of operations that are bad
+    SEND_FAILURE_RATE = 0.05; // percent of operations that fail to propagate
+
+  private
+    FSafeBoxLock : TPCCriticalSection;
+  protected
+    function ProcessStage(AStageNum : Integer; const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>; override;
+    procedure HandleErrorItems(const AErrorItems : array of TErrorResult); override;
+    procedure HandleFinishedItems(const AItems : array of TMempoolOperation); override;
+
+    // Mempool stages
+    function ExtractSafeBoxData(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+    function VerifyOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+    function PropagateOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+    function AcceptOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+
+  public
+    constructor Create(AOwner : TComponent); override;
+    destructor Destroy; override;
+    procedure SimulateClassicProcess(const AItems : array of TMempoolOperation);
+  end;
+
+  TPipelineTests = class(TTestCase)
+  private
+    procedure TestSimplePipeline(AStages, AWorkers : Integer);
+  published
+    procedure TestSimplePipeline_1Stage_1Worker;
+    procedure TestSimplePipeline_3Stages_1Workers;
+    procedure TestSimplePipeline_3Stages_2Workers;
+    procedure TestSimplePipeline_3Stages_3Workers;
+    procedure TestSimplePipeline_3Stages_4Workers;
+    procedure TestMempoolSimulation;
+    procedure TestMempoolClassicSimulation;
+  end;
+
+implementation
+
+uses UMemory;
+
+{ Implementation Functions }
+
+procedure DoSomeWork(ANumAdditions, ANumMuls, ANumDivs : Integer);
+var i, j : Integer;
+
+begin
+ for i := 1 to ANumAdditions do
+   j := i + (i+1);
+ for i := 1 to ANumMuls do
+   j := i * (i+1);
+ for i := 1 to ANumDivs do
+   j := i div (i+1);
+end;
+
+{ TSimplePipeline }
+
+constructor TSimplePipeline.Create(AOwner: TComponent; AStageCount, AMaxWorkerThreads : Integer);
+begin
+  inherited Create(AOwner, AStageCount, AMaxWorkerThreads);
+  FSucceeded := TList<TTestRecord>.Create;
+  FErrors := TList<TErrorResult>.Create;
+end;
+
+destructor TSimplePipeline.Destroy;
+begin
+ FSucceeded.Destroy;
+ FErrors.Destroy;
+end;
+
+function TSimplePipeline.ProcessStage(AStageNum : Integer; const AItems : TArray<TTestRecord>; out AErrors : TArray<TErrorResult>) : TArray<TTestRecord>;
+begin
+ // Just repeat same stage each time, typically would use different stage per AStageNum
+ Result := StageHandler(AItems, AErrors);
+end;
+
+procedure TSimplePipeline.HandleErrorItems(const AErrorItems: array of TErrorResult);
+begin
+  FErrors.AddRange(AErrorItems);
+end;
+
+procedure TSimplePipeline.HandleFinishedItems(const AItems : array of TTestRecord);
+begin
+  FSucceeded.AddRange(AItems);
+end;
+
+function TSimplePipeline.StageHandler(const AItems : TArray<TTestRecord>; out AErrors : TArray<TErrorResult>) : TArray<TTestRecord>;
+var
+  i : Integer;
+  LSuccess : TList<TTestRecord>;
+  LErrors : TList<TErrorResult>;
+  LError : TErrorResult;
+  LDisposables : TDisposables;
+begin
+  // Setup temp collections to build processed items
+  LSuccess := LDisposables.AddObject( TList<TTestRecord>.Create ) as TList<TTestRecord>;
+  LErrors := LDisposables.AddObject( TList<TErrorResult>.Create ) as TList<TErrorResult>;
+
+  // Filter out odd numbers
+  for i := Low(AItems) to High(AItems) do begin
+    if AItems[i].Number mod 2 = 0 then begin
+      LSuccess.Add(AItems[i]);
+    end else begin
+      LError.Item := AItems[i];
+      LError.ErrorMessage := 'Item number was odd';
+      LErrors.Add(LError);
+    end;
+    Sleep(5);
+  end;
+
+  // Return processed items
+  AErrors := LErrors.ToArray;
+  Result := LSuccess.ToArray;
+end;
+
+{ TMempoolMock }
+
+constructor TMempoolMock.Create(AOwner : TComponent);
+begin
+  inherited Create(AOwner, 4, 2); // 4 stages with 2 worker threads
+  FSafeBoxLock := TPCCriticalSection.Create('TMempoolMock');
+end;
+
+destructor TMempoolMock.Destroy;
+begin
+  inherited;
+  FreeAndNil(FSafeBoxLock);
+end;
+
+function TMempoolMock.ProcessStage(AStageNum : Integer; const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+begin
+  case AStageNum of
+    0: Result := ExtractSafeBoxData(AItems, AErrors); // Stage 1: Extract Safebox data (NOperation/Key)
+    1: Result := VerifyOperations(AItems, AErrors); // Stage 2: Verify operations
+    2: begin
+        // Stage 3: Propagate operations to other nodes
+         PropagateOperations(AItems, AErrors);
+         Result := AItems; // use argument, since we don't care about send failures in next phase
+       end;
+    3: Result := AcceptOperations(AItems, AErrors); // Stage 3: Accept operations in local SafeBox transaction
+  end;
+end;
+
+procedure TMempoolMock.HandleErrorItems(const AErrorItems : array of TErrorResult);
+var i : Integer;
+begin
+ for i := Low(AErrorItems) to High(AErrorItems) do
+   Sleep(LOG_OPERATION_LAG);
+end;
+
+procedure TMempoolMock.HandleFinishedItems(const AItems : array of TMempoolOperation);
+begin
+  // end-of-pipeline, maybe save to disk?
+end;
+
+// Mempool stages
+function TMempoolMock.ExtractSafeBoxData(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+var
+  LDisposables : TDisposables;
+  LOutput : TList<TMempoolOperation>;
+  i : Integer;
+begin
+ LOutput := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+ FSafeBoxLock.Enter;
+ try
+   for i := Low(AItems) to High(AItems) do begin
+     //AItems[i].Signer.PublicKey := lookup pubkey from safebox
+     //AItems[i].Signer.NOperation := lookup n-operation from safebox
+     DoSomeWork(10000, 10000, 10000);
+     LOutput.Add(AItems[i]);
+   end;
+ finally
+   FSafeBoxLock.Release;
+ end;
+ Result := LOutput.ToArray;
+ SetLength(AErrors, 0);
+end;
+
+function TMempoolMock.VerifyOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+var
+  LDisposables : TDisposables;
+  LOutput : TList<TMempoolOperation>;
+  LErrors : TList<TErrorResult>;
+  LError : TErrorResult;
+  i : Integer;
+begin
+  LOutput := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+  LErrors := LDisposables.AddObject( TList<TErrorResult>.Create ) as TList<TErrorResult>;
+  for i := Low(AItems) to High(AItems) do begin
+    // if VerifySignature(AItems[i].Signature, AItems[i].SignerPubKey) then error
+    DoSomeWork(10000, 10000, 10000);
+    if random() >= ILLEGAL_OP_RATE then
+      LOutput.Add(AItems[i])
+    else begin
+      LError.Item := AItems[i];
+      LError.ErrorMessage := 'Invalid Operations';
+      LErrors.Add(LError);
+    end;
+  end;
+  Result := LOutput.ToArray;
+  AErrors := LErrors.ToArray;
+end;
+
+function TMempoolMock.PropagateOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+var
+  LDisposables : TDisposables;
+  LOutput : TList<TMempoolOperation>;
+  LErrors : TList<TErrorResult>;
+  LError : TErrorResult;
+  i : Integer;
+begin
+  LOutput := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+  LErrors := LDisposables.AddObject( TList<TErrorResult>.Create ) as TList<TErrorResult>;
+  for i := Low(AItems) to High(AItems) do begin
+    // Send AItems[i] to all connected peers (except sender of AItems[i])
+    DoSomeWork(10000, 10000, 10000);
+    if random() >= SEND_FAILURE_RATE then
+      LOutput.Add(AItems[i])
+    else begin
+      LError.Item := AItems[i];
+      LError.ErrorMessage := 'Failed to propagate operation';
+      LErrors.Add(LError);
+    end;
+  end;
+  Result := LOutput.ToArray;
+  AErrors := LErrors.ToArray;
+end;
+
+function TMempoolMock.AcceptOperations(const AItems : TArray<TMempoolOperation>; out AErrors : TArray<TErrorResult>) : TArray<TMempoolOperation>;
+var
+  LDisposables : TDisposables;
+  LOutput : TList<TMempoolOperation>;
+  LErrors : TList<TErrorResult>;
+  LError : TErrorResult;
+  i : Integer;
+begin
+  LOutput := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+  LErrors := LDisposables.AddObject( TList<TErrorResult>.Create ) as TList<TErrorResult>;
+  FSafeBoxLock.Enter;
+  try
+    for i := Low(AItems) to High(AItems) do begin
+      DoSomeWork(10000, 10000, 10000);
+      if random() >= ILLEGAL_OP_RATE then
+        LOutput.Add(AItems[i])
+      else begin
+        LError.Item := AItems[i];
+        LError.ErrorMessage := 'Failed to add operation to mempool';
+        LErrors.Add(LError);
+      end;
+    end;
+  finally
+    FSafeBoxLock.Release;
+  end;
+  Result := LOutput.ToArray;
+  AErrors := LErrors.ToArray;
+end;
+
+// Simulates the existing workflow of single-threaded single-operation processing
+procedure TMempoolMock.SimulateClassicProcess(const AItems : array of TMempoolOperation);
+var
+  i : Integer;
+  LOperations : TArray<TMempoolOperation>;
+  LErrors : TArray<TErrorResult>;
+begin
+  for i := Low(AItems) to High(AItems) do begin
+    LOperations := TArray<TMempoolOperation>.Create( AItems[i] );  // only 1 operation
+    FSafeBoxLock.Enter;
+    try
+      LOperations := ExtractSafeBoxData(LOperations, LErrors);
+      if Length(LErrors) > 0  then
+        HandleErrorItems(LErrors);
+
+      LOperations := VerifyOperations(LOperations, LErrors);
+      if Length(LErrors) > 0  then
+        HandleErrorItems(LErrors);
+
+      PropagateOperations(LOperations, LErrors);
+      if Length(LErrors) > 0  then
+        HandleErrorItems(LErrors);
+
+      AcceptOperations(LOperations, LErrors);
+      if Length(LErrors) > 0  then
+        HandleErrorItems(LErrors);
+
+    finally
+      FSafeBoxLock.Release;
+    end;
+  end;
+end;
+
+{ TPipelineTests }
+
+procedure TPipelineTests.TestSimplePipeline_1Stage_1Worker;
+begin
+  TestSimplePipeline(1, 1);
+end;
+
+procedure TPipelineTests.TestSimplePipeline_3Stages_1Workers;
+begin
+  TestSimplePipeline(3, 1);
+end;
+
+procedure TPipelineTests.TestSimplePipeline_3Stages_2Workers;
+begin
+  TestSimplePipeline(3, 2);
+end;
+
+procedure TPipelineTests.TestSimplePipeline_3Stages_3Workers;
+begin
+  TestSimplePipeline(3, 3);
+end;
+
+procedure TPipelineTests.TestSimplePipeline_3Stages_4Workers;
+begin
+  TestSimplePipeline(3, 4);
+end;
+
+procedure TPipelineTests.TestSimplePipeline(AStages, AWorkers : Integer);
+var
+  LPipeline : TSimplePipeline;
+  LDisposables : TDisposables;
+  LList : TList<TTestRecord>;
+  LRec : TTestRecord;
+  i : Integer;
+begin
+  LPipeline := LDisposables.AddObject( TSimplePipeline.Create(nil, AStages, AWorkers) ) as TSimplePipeline;
+  LList := LDisposables.AddObject( TList<TTestRecord>.Create ) as TList<TTestRecord>;
+  for i := 1 to 100 do begin
+    LRec.Number := i;
+    LList.Add(LRec);
+    if i mod 5 = 0 then begin
+      LPipeline.EnqueueRange(LList.ToArray);
+      LList.Clear;
+    end;
+  end;
+  while NOT LPipeline.HasCompleted do
+    Sleep(10);
+  AssertEquals(50, LPipeline.Succeeded.Count);
+  AssertEquals(50, LPipeline.Errors.Count);
+  AssertEquals(AWorkers, LPipeline.HistoricalMaxActiveWorkerThreads);
+  for i := 0 to LPipeline.Succeeded.Count - 1 do
+    AssertEquals(LPipeline.Succeeded[i].Number mod 2, 0);
+  for i := 0 to LPipeline.Errors.Count - 1 do
+    AssertEquals(LPipeline.Errors[i].Item.Number mod 2, 1);
+end;
+
+procedure TPipelineTests.TestMempoolSimulation;
+var
+  LMempool : TMempoolMock;
+  LDisposables : TDisposables;
+  LList : TList<TMempoolOperation>;
+  LRec : TMempoolOperation;
+  i : Integer;
+begin
+  LMempool := LDisposables.AddObject( TMempoolMock.Create(nil) ) as TMempoolMock;
+  LList := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+  for i := 1 to 3000 do begin
+    LList.Add(LRec);
+    if i mod 5 = 0 then begin
+      LMempool.SimulateClassicProcess(LList.ToArray);
+      LList.Clear;
+    end;
+  end;
+  while NOT LMempool.HasCompleted do
+    Sleep(10);
+end;
+
+procedure TPipelineTests.TestMempoolClassicSimulation;
+var
+  LMempool : TMempoolMock;
+  LDisposables : TDisposables;
+  LList : TList<TMempoolOperation>;
+  LRec : TMempoolOperation;
+  i : Integer;
+begin
+  LMempool := LDisposables.AddObject( TMempoolMock.Create(nil) ) as TMempoolMock;
+  LList := LDisposables.AddObject( TList<TMempoolOperation>.Create ) as TList<TMempoolOperation>;
+  for i := 1 to 3000 do begin
+    LList.Add(LRec);
+    if i mod 5 = 0 then begin
+      LMempool.SimulateClassicProcess(LList.ToArray);
+      LList.Clear;
+    end;
+  end;
+end;
+
+initialization
+  RegisterTest(TPipelineTests);
+
+end.
+