|
@@ -20,14 +20,14 @@ unit UPipeline;
|
|
|
interface
|
|
|
|
|
|
uses
|
|
|
- Classes, SyncObjs, SysUtils, UThread, Generics.Collections;
|
|
|
+ Generics.Collections, Classes, SyncObjs, SysUtils, UThread;
|
|
|
|
|
|
Type
|
|
|
-
|
|
|
{ TPipelineQueue }
|
|
|
|
|
|
TPipelineQueue<T> = class(TComponent)
|
|
|
private type
|
|
|
+ __TArray_T = TArray<T>;
|
|
|
|
|
|
{ TStageQueue }
|
|
|
|
|
@@ -51,8 +51,8 @@ Type
|
|
|
{ TErrorResult }
|
|
|
|
|
|
TErrorResult = record
|
|
|
- Item : T;
|
|
|
- ErrorMessage : String;
|
|
|
+ Item : T;
|
|
|
+ ErrorMessage : String;
|
|
|
end;
|
|
|
|
|
|
{ TPipelineWorkerThread}
|
|
@@ -81,8 +81,8 @@ Type
|
|
|
function GetStageCount : Integer; inline;
|
|
|
protected
|
|
|
function ProcessStage(AStageNum : Integer; const AItems : TArray<T>; out TErrors : TArray<TErrorResult>) : TArray<T>; virtual; abstract;
|
|
|
- procedure HandleErrorItems(const AErrorItems : array of T); virtual; abstract;
|
|
|
- procedure HandleFinishedItems(const AErrorItems : array of T); virtual; abstract;
|
|
|
+ procedure HandleErrorItems(const AErrorItems : array of TErrorResult); virtual; abstract;
|
|
|
+ procedure HandleFinishedItems(const AItems : array of T); virtual; abstract;
|
|
|
|
|
|
public
|
|
|
property StageCount : Integer read GetStageCount;
|
|
@@ -126,12 +126,12 @@ begin
|
|
|
end;
|
|
|
end;
|
|
|
|
|
|
-procedure TPipelineQueue<T>.Enqueue(AStage : Integer; const AItem : T); overload;
|
|
|
+procedure TPipelineQueue<T>.Enqueue(AStage : Integer; const AItem : T);
|
|
|
begin
|
|
|
EnqueueRange(AStage, [AItem]);
|
|
|
end;
|
|
|
|
|
|
-procedure TPipelineQueue<T>.EnqueueRange(AStage : Integer; const AItems : array of T); overload;
|
|
|
+procedure TPipelineQueue<T>.EnqueueRange(AStage : Integer; const AItems : array of T);
|
|
|
begin
|
|
|
FQueues[AStage].Lock.BeginWrite;
|
|
|
try
|
|
@@ -143,12 +143,12 @@ begin
|
|
|
NotifyPipelineAppended(AStage);
|
|
|
end;
|
|
|
|
|
|
-procedure TPipelineQueue<T>.Enqueue(const AItem : T); overload;
|
|
|
+procedure TPipelineQueue<T>.Enqueue(const AItem : T);
|
|
|
begin
|
|
|
Enqueue(0, AItem);
|
|
|
end;
|
|
|
|
|
|
-procedure TPipelineQueue<T>.EnqueueRange(const AItems : array of T); overload;
|
|
|
+procedure TPipelineQueue<T>.EnqueueRange(const AItems : array of T);
|
|
|
begin
|
|
|
EnqueueRange(0, AItems);
|
|
|
end;
|
|
@@ -157,7 +157,6 @@ procedure TPipelineQueue<T>.NotifyPipelineAppended(AStage : Integer);
|
|
|
begin
|
|
|
if (FActiveWorkerThreads = 0) OR (FActiveWorkerThreads < FMaxWorkerThreads) then begin
|
|
|
// Start a new worker thread to process
|
|
|
- Inc(FActiveWorkerThreads);
|
|
|
TPipelineWorkerThread.Create(Self, AStage);
|
|
|
{$IFDEF UNITTESTS}
|
|
|
if (FActiveWorkerThreads > FHistoricalMaxActiveWorkerThreads) then
|
|
@@ -222,7 +221,7 @@ end;
|
|
|
|
|
|
{ TPipelineQueue<T>.TPipelineWorkerThread }
|
|
|
|
|
|
-constructor TPipelineQueue<T>.TPipelineWorkerThread.Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer); overload;
|
|
|
+constructor TPipelineQueue<T>.TPipelineWorkerThread.Create(const APipelineQueue : TPipelineQueue<T>; AStage : Integer);
|
|
|
begin
|
|
|
inherited Create(False);
|
|
|
Self.FreeOnTerminate := true;
|