瀏覽代碼

[threads] new Background workers

Exilon 4 年之前
父節點
當前提交
30fc1b8cb3
共有 1 個文件被更改,包括 77 次插入7 次删除
  1. 77 7
      Quick.Threads.pas

+ 77 - 7
Quick.Threads.pas

@@ -1,13 +1,13 @@
 { ***************************************************************************
 
-  Copyright (c) 2016-2020 Kike Pérez
+  Copyright (c) 2016-2021 Kike Pérez
 
   Unit        : Quick.Threads
   Description : Thread safe collections
   Author      : Kike Pérez
   Version     : 1.5
   Created     : 09/03/2018
-  Modified    : 27/06/2020
+  Modified    : 08/02/2021
 
   This file is part of QuickLib: https://github.com/exilon/QuickLib
 
@@ -485,8 +485,10 @@ type
   end;
 
   TSimpleWorker = class(TWorker)
+  private
+    fRunOnce : Boolean;
   public
-    constructor Create(aTask : ITask);
+    constructor Create(aTask : ITask; aRunOnce : Boolean = True);
     procedure Execute; override;
   end;
 
@@ -589,6 +591,20 @@ type
     procedure Stop;
   end;
 
+  TBackgroundWorkers = class
+  private
+    fWorkerPool : TWorkerPool;
+    fConcurrentWorkers : Integer;
+    fWorkerTask : IWorkTask;
+  public
+    constructor Create(aConcurrentWorkers : Integer);
+    destructor Destroy; override;
+    property ConcurrentWorkers : Integer read fConcurrentWorkers;
+    function OnExecute(aWorkerProc : TTaskProc) : IWorkTask;
+    procedure Start;
+    procedure Stop;
+  end;
+
 implementation
 
 { TThreadedQueueCS<T> }
@@ -1571,9 +1587,10 @@ end;
 
 { TSimpleWorker }
 
-constructor TSimpleWorker.Create(aTask : ITask);
+constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
 begin
   inherited Create;
+  fRunOnce := aRunOnce;
   fCurrentTask := aTask;
   FreeOnTerminate := True;
 end;
@@ -1587,6 +1604,7 @@ begin
     try
       fStatus := TWorkerStatus.wsWorking;
       try
+        SetFaultPolicy(TTask(fCurrentTask));
         if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
           else fCurrentTask.DoExecute;
       except
@@ -1597,10 +1615,14 @@ begin
         end;
       end;
     finally
-      if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
-        else fCurrentTask.DoTerminate;
       fStatus := TWorkerStatus.wsIdle;
-      Terminate;
+      if fRunOnce then Terminate;
+      try
+        if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
+          else fCurrentTask.DoTerminate;
+      except
+        on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
+      end;
     end;
   end;
   fStatus := TWorkerStatus.wsSuspended
@@ -2307,4 +2329,52 @@ begin
   inherited;
 end;
 
+{ TBackgroundWorkers }
+
+constructor TBackgroundWorkers.Create(aConcurrentWorkers: Integer);
+begin
+  fConcurrentWorkers := aConcurrentWorkers;
+end;
+
+destructor TBackgroundWorkers.Destroy;
+begin
+  fWorkerPool.Free;
+  inherited;
+end;
+
+function TBackgroundWorkers.OnExecute(aWorkerProc: TTaskProc): IWorkTask;
+begin
+//  fWorkerTask := TWorkTask.Create([],False,procedure(task : ITask)
+//                    begin
+//                      aWorkerProc;
+//                    end);
+  fWorkerTask := TWorkTask.Create([],False,aWorkerProc);
+  fWorkerTask.Run;
+  Result := fWorkerTask;
+end;
+
+procedure TBackgroundWorkers.Start;
+var
+  i : Integer;
+  worker : TWorker;
+begin
+  for i := 1 to fConcurrentWorkers do
+  begin
+    worker := TSimpleWorker.Create(fWorkerTask,False);
+    worker.Start;
+  end;
+end;
+
+procedure TBackgroundWorkers.Stop;
+var
+  worker : TWorker;
+begin
+  for worker in fWorkerPool do
+  begin
+    worker.Terminate;
+    worker.WaitFor;
+    fWorkerPool.Remove(worker);
+  end;
+end;
+
 end.