Browse Source

[threads] background workers refactorized

Exilon 4 năm trước cách đây
mục cha
commit
9207eda28a
1 tập tin đã thay đổi với 103 bổ sung18 xóa
  1. 103 18
      Quick.Threads.pas

+ 103 - 18
Quick.Threads.pas

@@ -7,7 +7,7 @@
   Author      : Kike Pérez
   Author      : Kike Pérez
   Version     : 1.5
   Version     : 1.5
   Created     : 09/03/2018
   Created     : 09/03/2018
-  Modified    : 08/02/2021
+  Modified    : 08/03/2021
 
 
   This file is part of QuickLib: https://github.com/exilon/QuickLib
   This file is part of QuickLib: https://github.com/exilon/QuickLib
 
 
@@ -595,12 +595,29 @@ type
   private
   private
     fWorkerPool : TWorkerPool;
     fWorkerPool : TWorkerPool;
     fConcurrentWorkers : Integer;
     fConcurrentWorkers : Integer;
-    fWorkerTask : IWorkTask;
+    fWorkerInitProc : TTaskProc;
+    fWorkerExecuteProc : TTaskProc;
+    fWorkerRetryProc : TTaskRetryProc;
+    fWorkerExceptionProc : TTaskExceptionProc;
+    fWorkerTerminateProc : TTaskProc;
+    fMaxRetries : Integer;
+    fFaultPolicy : TFaultPolicy;
+    procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
   public
   public
-    constructor Create(aConcurrentWorkers : Integer);
+    constructor Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
     destructor Destroy; override;
     destructor Destroy; override;
     property ConcurrentWorkers : Integer read fConcurrentWorkers;
     property ConcurrentWorkers : Integer read fConcurrentWorkers;
-    function OnExecute(aWorkerProc : TTaskProc) : IWorkTask;
+    function OnInitialize(aTaskProc : TTaskProc) : TBackgroundWorkers;
+    function OnException(aTaskProc : TTaskExceptionProc) : TBackgroundWorkers;
+    function OnRetry(aTaskProc : TTaskRetryProc) : TBackgroundWorkers;
+    function OnTerminated(aTaskProc : TTaskProc) : TBackgroundWorkers;
+    function Retry(aMaxRetries : Integer) : TBackgroundWorkers;
+    function RetryForever : TBackgroundWorkers;
+    function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
+    function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : TBackgroundWorkers; overload;
+    function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
+    function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
+    function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
     procedure Start;
     procedure Start;
     procedure Stop;
     procedure Stop;
   end;
   end;
@@ -1616,13 +1633,13 @@ begin
       end;
       end;
     finally
     finally
       fStatus := TWorkerStatus.wsIdle;
       fStatus := TWorkerStatus.wsIdle;
-      if fRunOnce then Terminate;
       try
       try
         if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
         if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
           else fCurrentTask.DoTerminate;
           else fCurrentTask.DoTerminate;
       except
       except
         on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
         on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
       end;
       end;
+      if fRunOnce then Terminate;
     end;
     end;
   end;
   end;
   fStatus := TWorkerStatus.wsSuspended
   fStatus := TWorkerStatus.wsSuspended
@@ -2331,9 +2348,11 @@ end;
 
 
 { TBackgroundWorkers }
 { TBackgroundWorkers }
 
 
-constructor TBackgroundWorkers.Create(aConcurrentWorkers: Integer);
+constructor TBackgroundWorkers.Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
 begin
 begin
   fConcurrentWorkers := aConcurrentWorkers;
   fConcurrentWorkers := aConcurrentWorkers;
+  fWorkerExecuteProc := aWorkerProc;
+  fWorkerPool := TWorkerPool.Create(True);
 end;
 end;
 
 
 destructor TBackgroundWorkers.Destroy;
 destructor TBackgroundWorkers.Destroy;
@@ -2342,25 +2361,23 @@ begin
   inherited;
   inherited;
 end;
 end;
 
 
-function TBackgroundWorkers.OnExecute(aWorkerProc: TTaskProc): IWorkTask;
-begin
-//  fWorkerTask := TWorkTask.Create([],False,procedure(task : ITask)
-//                    begin
-//                      aWorkerProc;
-//                    end);
-  fWorkerTask := TWorkTask.Create([],False,aWorkerProc);
-  fWorkerTask.Run;
-  Result := fWorkerTask;
-end;
-
 procedure TBackgroundWorkers.Start;
 procedure TBackgroundWorkers.Start;
 var
 var
   i : Integer;
   i : Integer;
   worker : TWorker;
   worker : TWorker;
+  task : IWorkTask;
 begin
 begin
   for i := 1 to fConcurrentWorkers do
   for i := 1 to fConcurrentWorkers do
   begin
   begin
-    worker := TSimpleWorker.Create(fWorkerTask,False);
+    task := TWorkTask.Create([],False,fWorkerExecuteProc)
+            .OnInitialize(fWorkerInitProc)
+            .OnRetry(fWorkerRetryProc)
+            .OnException(fWorkerExceptionProc)
+            .OnTerminated(fWorkerTerminateProc);
+    task.NumWorker := i;
+    task.Run;
+    worker := TSimpleWorker.Create(task,False);
+    fWorkerPool.Add(worker);
     worker.Start;
     worker.Start;
   end;
   end;
 end;
 end;
@@ -2377,4 +2394,72 @@ begin
   end;
   end;
 end;
 end;
 
 
+function TBackgroundWorkers.OnException(aTaskProc: TTaskExceptionProc): TBackgroundWorkers;
+begin
+  Result := Self;
+  fWorkerExceptionProc := aTaskProc;
+end;
+
+function TBackgroundWorkers.OnInitialize(aTaskProc: TTaskProc): TBackgroundWorkers;
+begin
+  Result := Self;
+  fWorkerInitProc := aTaskProc;
+end;
+
+function TBackgroundWorkers.OnRetry(aTaskProc: TTaskRetryProc): TBackgroundWorkers;
+begin
+  Result := Self;
+  fWorkerRetryProc := aTaskProc;
+end;
+
+function TBackgroundWorkers.OnTerminated(aTaskProc: TTaskProc): TBackgroundWorkers;
+begin
+  Result := Self;
+  fWorkerTerminateProc := aTaskProc;
+end;
+
+function TBackgroundWorkers.Retry(aMaxRetries: Integer): TBackgroundWorkers;
+begin
+  Result := Self;
+  SetRetryPolicy(aMaxRetries,0,1);
+end;
+
+function TBackgroundWorkers.RetryForever: TBackgroundWorkers;
+begin
+  Result := Self;
+  SetRetryPolicy(-1,0,1);
+end;
+
+procedure TBackgroundWorkers.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double);
+begin
+  fFaultPolicy.MaxRetries := aMaxRetries;
+  fFaultPolicy.WaitTimeBetweenRetries := aWaitTimeBetweenRetriesMS;
+  fFaultPolicy.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
+end;
+
+function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
+begin
+
+end;
+
+function TBackgroundWorkers.WaitAndRetry(aWaitTimeArray: TArray<Integer>): TBackgroundWorkers;
+begin
+
+end;
+
+function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
+begin
+
+end;
+
+function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
+begin
+
+end;
+
+function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
+begin
+
+end;
+
 end.
 end.