Răsfoiți Sursa

* No wait loop, use event

Michaël Van Canneyt 3 ani în urmă
părinte
comite
c632736197

+ 1 - 1
packages/fcl-base/examples/testthreadpool.pp

@@ -90,7 +90,7 @@ begin
   MyPool:=TFPSimpleThreadPool.Create;
   try
     MyPool.AddTimeout:=40;
-    MyPool.AutoCheckQueuedTasks:=True;
+    MyPool.AutoCheckQueuedInterval:=50;
     // RunTest(MyPool);
     RunTest(TFPSimpleThreadPool.Instance);
   finally

+ 66 - 38
packages/fcl-base/src/fpthreadpool.pp

@@ -9,9 +9,9 @@ Uses Classes, SysUtils, DateUtils, SyncObjs;
 
 
 Const
-  DefaultAddWaitInterval = 50;
-  DefaultAddTimeOut      = 5000;
+  DefaultAddTimeOut      = 0;
   DefaultQueueTasks      = True;
+  DefaultTerminateWait   = 50;
 
 Type
   EThreadPool = Class(Exception);
@@ -142,7 +142,10 @@ Type
        { TAbstractThreadList }
 
        TAbstractThreadList = class(TObject)
-         Constructor CreateList; virtual; abstract;
+       private
+         FPool: TFPCustomSimpleThreadPool;
+       public
+         Constructor CreateList(aPool : TFPCustomSimpleThreadPool); virtual;
          // Return a thread ready to execute task.
          Function GetAvailableThread : TAbstractTaskThread; virtual; abstract;
          // Add thread which must execute task
@@ -155,6 +158,8 @@ Type
          Function GetIdleThreadCount : Word; virtual; abstract;
          // Terminate all treads.
          Procedure TerminateThreads; virtual; abstract;
+         // Our pool
+         Property Pool : TFPCustomSimpleThreadPool Read FPool;
        end;
 
        { TTaskThread }
@@ -162,11 +167,12 @@ Type
        TTaskThread = Class(TAbstractTaskThread)
        Private
          FTaskEvent : TEventObject;
+         FTaskDoneEvent : TNotifyEvent;
          FWaitInterval : Integer;
        Protected
          procedure TerminatedSet; override;
        Public
-         Constructor create(aWaitInterval : Integer; CreateSuspended : Boolean; aOnTerminate : TNotifyEvent); virtual;
+         Constructor create(aWaitInterval : Integer; aTaskDoneEvent,aOnTerminate : TNotifyEvent); virtual;
          Destructor Destroy; override;
          procedure DoSetTask(AValue: TThreadPoolTask); override;
          procedure Execute; override;
@@ -179,9 +185,10 @@ Type
        private
          FThreadTaskWaitInterval: Integer;
          FList : TThreadList;
+         procedure TaskDone(Sender: TObject);
          Procedure ThreadTerminated(Sender : TObject);
        public
-         Constructor CreateList; override;
+         Constructor CreateList(aPool : TFPCustomSimpleThreadPool); override;
          Procedure TerminateThreads; override;
          Function GetAvailableThread : TAbstractTaskThread; override;
          Function AddThread : TAbstractTaskThread; override;
@@ -195,19 +202,19 @@ Type
     class var _Instance : TFPCustomSimpleThreadPool;
     class var _DefaultInstanceClass : TFPCustomSimpleThreadPoolClass;
   private
-    FAutoCheckQueuedTasks: Boolean;
+    FAutoCheckQueuedInterval: Integer;
     FMaxThreads: Word;
     FMinThreads: Word;
     FAddTimeout: Cardinal;
-    FAddWaitInterval: Cardinal;
     FQueueTasks: Boolean;
     FWaitQueueLock : TCriticalSection;
     FTaskQueueLock : TCriticalSection;
     FTaskList : TAbstractThreadList;
     FWaitQueue : TThreadList;
     FAutoCheckQueueThread : TAutoCheckQueueThread;
+    FThreadDoneEvent : TEventObject;
     class function GetInstance: TFPCustomSimpleThreadPool; static;
-    procedure SetAutoCheckQueuedTasks(AValue: Boolean);
+    procedure SetAutoCheckQueuedInterval(AValue: Integer);
     class procedure SetDefaultInstanceClass(AValue: TFPCustomSimpleThreadPoolClass); static;
     procedure SetMaxThreads(AValue: Word);
     procedure SetMinThreads(AValue: Word);
@@ -236,9 +243,7 @@ Type
     Property MinThreads : Word Read FMinThreads Write SetMinThreads;
     // Max number of threads
     Property MaxThreads : Word Read FMaxThreads Write SetMaxThreads;
-    // Wait interval in milliseconds when adding task and checking for an available thread
-    Property AddWaitInterval : Cardinal Read FAddWaitInterval Write FAddWaitInterval;
-    // Queue timeout in milliseconds when adding task. Set to Zero to wait forever.
+    // when QueueTasks is False, Timeout in milliseconds to wait for thread to become available. Set to Zero to not wait.
     Property AddTimeout : Cardinal Read FAddTimeout Write FAddTimeout;
     // Set QueueTasks to add the tasks to a queue if they cannot be executed within the AddTimeout interval
     Property QueueTasks : Boolean Read FQueueTasks Write FQueueTasks;
@@ -248,8 +253,8 @@ Type
     Property IdleThreadCount : Word Read GetIdleThreadCount;
     // Number of threads
     Property ThreadCount : Word Read GetThreadCount;
-    // Set to true to start a thread that runs the CheckQueuedTasks
-    Property AutoCheckQueuedTasks : Boolean Read FAutoCheckQueuedTasks Write SetAutoCheckQueuedTasks;
+    // Set to a value>0  to start a thread that runs the CheckQueuedTasks every AutoCheckQueuedInterval ms.
+    Property AutoCheckQueuedInterval : Integer Read FAutoCheckQueuedInterval Write SetAutoCheckQueuedInterval;
   Public
     constructor Create; virtual;
     destructor destroy; override;
@@ -274,13 +279,12 @@ Type
   Public
     Property MinThreads;
     Property MaxThreads;
-    Property AddWaitInterval;
     Property AddTimeout;
     Property QueueTasks;
     Property BusyThreadCount;
     Property IdleThreadCount;
     Property ThreadCount;
-    Property AutoCheckQueuedTasks;
+    Property AutoCheckQueuedInterval;
   end;
 
 Implementation
@@ -303,6 +307,14 @@ Procedure DoLog(Const Fmt : String; Const Args : Array of const);
 begin
   DoLog(Format(Fmt,Args))
 end;
+
+{ TFPCustomSimpleThreadPool.TAbstractThreadList }
+
+constructor TFPCustomSimpleThreadPool.TAbstractThreadList.CreateList(aPool: TFPCustomSimpleThreadPool);
+begin
+  FPool:=aPool;
+end;
+
 {$ENDIF}
 
 { TFPCustomSimpleThreadPool.TAutoCheckQueueThread }
@@ -373,12 +385,13 @@ begin
   inherited TerminatedSet;
 end;
 
-constructor TFPCustomSimpleThreadPool.TTaskThread.create(aWaitInterval : Integer; CreateSuspended: Boolean; aOnTerminate : TNotifyEvent);
+constructor TFPCustomSimpleThreadPool.TTaskThread.create(aWaitInterval : Integer; aTaskDoneEvent,aOnTerminate : TNotifyEvent);
 begin
   FTaskEvent:=TEventObject.Create(Nil,False,False,'');
+  FTaskDoneEvent:=aTaskDoneEvent;
   FWaitInterval:=aWaitInterval;
   OnTerminate:=aOnTerminate;
-  inherited create(CreateSuspended);
+  inherited create(False);
 end;
 
 destructor TFPCustomSimpleThreadPool.TTaskThread.Destroy;
@@ -406,6 +419,8 @@ begin
           Task.Execute;
         finally
           FreeTask;
+          if Assigned(FTaskDoneEvent) then
+            FTaskDoneEvent(Self);
         end;
       end;
     end;
@@ -418,8 +433,16 @@ begin
   FList.Remove(Sender);
 end;
 
-constructor TFPCustomSimpleThreadPool.TThreadPoolList.CreateList;
+procedure TFPCustomSimpleThreadPool.TThreadPoolList.TaskDone(Sender: TObject);
 begin
+  If Assigned(Pool) and Assigned(Pool.FThreadDoneEvent) then
+    Pool.FThreadDoneEvent.SetEvent;
+
+end;
+
+constructor TFPCustomSimpleThreadPool.TThreadPoolList.CreateList(aPool : TFPCustomSimpleThreadPool);
+begin
+  Inherited;
   FList:=TThreadList.Create;
 end;
 
@@ -443,7 +466,7 @@ end;
 
 function TFPCustomSimpleThreadPool.TThreadPoolList.AddThread: TAbstractTaskThread;
 begin
-  Result:=TTaskThread.Create(FThreadTaskWaitInterval,False,@ThreadTerminated);
+  Result:=TTaskThread.Create(FThreadTaskWaitInterval,@TaskDone,@ThreadTerminated);
   FList.Add(Result);
 end;
 
@@ -618,16 +641,16 @@ end;
 
 Function TFPCustomSimpleThreadPool.CreateAutoCheckQueueThread :TAutoCheckQueueThread;
 begin
-  Result:=TAutoCheckQueueThread.Create(Self,AddWaitInterval);
+  Result:=TAutoCheckQueueThread.Create(Self,AutoCheckQueuedInterval);
 end;
 
-procedure TFPCustomSimpleThreadPool.SetAutoCheckQueuedTasks(AValue: Boolean);
+procedure TFPCustomSimpleThreadPool.SetAutoCheckQueuedInterval(AValue: Integer);
 begin
   FWaitQueueLock.Enter;
   try
-    if FAutoCheckQueuedTasks=AValue then Exit;
-    FAutoCheckQueuedTasks:=AValue;
-    if FAutoCheckQueuedTasks then
+    if FAutoCheckQueuedInterval=AValue then Exit;
+    FAutoCheckQueuedInterval:=AValue;
+    if FAutoCheckQueuedInterval=0 then
       begin
       if Assigned(FAutoCheckQueueThread) then
         begin
@@ -691,19 +714,22 @@ Var
   L : TList;
 
 begin
+  L:=nil;
   FWaitQueueLock.Enter;
   try
     L:=FWaitQueue.LockList;
     While (L.Count>0) and DoAddTask(TThreadPoolTask(L[L.Count-1])) do
       L.Delete(L.Count-1);
   finally
+    if Assigned(L) then
+      FWaitQueue.UnlockList;
     FWaitQueueLock.Leave;
   end;
 end;
 
 function TFPCustomSimpleThreadPool.AddTaskToQueue(aTask: TThreadPoolTask): Boolean;
 begin
-  {$IFDEF DEBUGTHREADPOOL} DoLog(('Adding task '+aTask.ToString+' to queue');{$ENDIF}
+  {$IFDEF DEBUGTHREADPOOL} DoLog('Adding task '+aTask.ToString+' to queue');{$ENDIF}
   FWaitQueueLock.Enter;
   try
     FWaitQueue.Add(aTask);
@@ -720,6 +746,7 @@ Var
   L : TList;
   aTask : TThreadPoolTask;
 begin
+  L:=Nil;
   FWaitQueueLock.Enter;
   try
     L:=FWaitQueue.LockList;
@@ -731,6 +758,8 @@ begin
       aTask.Free;
       end;
   finally
+    if Assigned(L) then
+      FWaitQueue.UnlockList;
     FWaitQueueLock.Leave;
   end;
 end;
@@ -739,20 +768,20 @@ procedure TFPCustomSimpleThreadPool.DoTerminateRunningTasks(DoWait: Boolean);
 begin
   {$IFDEF DEBUGTHREADPOOL}DoLog('Terminating all threads');{$ENDIF}
   FTaskList.TerminateThreads;
-  {$IFDEF DEBUGTHREADPOOL}DoLog('Terminated all threads, wait: %s',BoolToStr(DoWait,True);{$ENDIF}
+  {$IFDEF DEBUGTHREADPOOL}DoLog('Terminated all threads, wait: '+BoolToStr(DoWait,True));{$ENDIF}
   if DoWait then
     begin
     While FTaskList.GetBusyThreadCount>0 do
       begin
-      {$IFDEF DEBUGTHREADPOOL}DoLog('Not all threads terminated, wait: %d',[FAddWaitInterval]);{$ENDIF}
-      Sleep(FAddWaitInterval);
+      {$IFDEF DEBUGTHREADPOOL}DoLog('Not all threads terminated, wait: %d',[DefaultTerminateWait]);{$ENDIF}
+      Sleep(DefaultTerminateWait);
       end;
     end;
 end;
 
 function TFPCustomSimpleThreadPool.CreateThreadList: TAbstractThreadList;
 begin
-  Result:=TThreadPoolList.CreateList;
+  Result:=TThreadPoolList.CreateList(Self);
 end;
 
 function TFPCustomSimpleThreadPool.DoAddTask(aTask: TThreadPoolTask) : Boolean;
@@ -778,16 +807,14 @@ begin
     Result:=T<>Nil;
     if Result then
       T.Task:=aTask
-    else
+    else if (Not QueueTasks) and (FAddTimeout>0) then
       begin
-      {$IFDEF DEBUGTHREADPOOL}DoLog('No available thread for task %s waiting %d to %d',[aTask.ToString,FAddWaitInterval,FAddTimeOut]);{$ENDIF}
-      if WaitStart=0 then
-        WaitStart:=Now;
-      Sleep(FAddWaitInterval);
-      TimeOut:=(FAddTimeOut>0) and (MillisecondsBetween(Now,WaitStart)>FAddTimeout);
+      {$IFDEF DEBUGTHREADPOOL}DoLog('No available thread for task %s waiting %d ms',[aTask.ToString,FAddTimeOut]);{$ENDIF}
+      FThreadDoneEvent.ResetEvent;
+      Timeout:=FThreadDoneEvent.WaitFor(FAddTimeout)<>wrSignaled;
       {$IFDEF DEBUGTHREADPOOL}
       If TimeOut then
-        DoLog('TimeOut reached: ',TimeOut);
+        DoLog('TimeOut reached: '+BoolToStr(TimeOut,True));
       {$ENDIF}
       end;
   Until Result or TimeOut;
@@ -795,7 +822,6 @@ end;
 
 constructor TFPCustomSimpleThreadPool.Create;
 begin
-  FAddWaitInterval:=DefaultAddWaitInterval;
   FAddTimeout:=DefaultAddTimeout;
   FWaitQueueLock:=TCriticalSection.Create;
   FTaskQueueLock:=TCriticalSection.Create;
@@ -804,12 +830,13 @@ begin
   MaxThreads:=TThread.ProcessorCount;
   MinThreads:=TThread.ProcessorCount-1;
   FQueueTasks:=DefaultQueueTasks;
+  FThreadDoneEvent:=TEventObject.Create(Nil,False,False,'');
 end;
 
 destructor TFPCustomSimpleThreadPool.destroy;
 begin
   // Disable the queue
-  AutoCheckQueuedTasks:=False;
+  AutoCheckQueuedInterval:=0;
   {$IFDEF DEBUGTHREADPOOL}DoLog('Destroy : Cancelqueuedtasks');{$ENDIF}
   CancelQueuedTasks;
   {$IFDEF DEBUGTHREADPOOL}DoLog('Destroy : TerminateRunningTasks');{$ENDIF}
@@ -818,6 +845,7 @@ begin
   FreeAndNil(FTaskList);
   FreeAndNil(FWaitQueueLock);
   FreeAndNil(FTaskQueueLock);
+  FreeAndNil(FThreadDoneEvent);
   inherited destroy;
 end;