소스 검색

Sync methods added

TAnonymousThread, TBackgroundTasks & TScheduledTasks _Sync methods added
Unknown 6 년 전
부모
커밋
0b64636931
2개의 변경된 파일192개의 추가작업 그리고 30개의 파일을 삭제
  1. 184 28
      Quick.Threads.pas
  2. 8 2
      Quick.Value.pas

+ 184 - 28
Quick.Threads.pas

@@ -142,21 +142,38 @@ type
   TProc = procedure of object;
   {$ENDIF}
 
+  TAdvThread = class(TThread)
+  private
+    fExecuteProc : TProc;
+    fTerminateProc : TProc;
+    fExecuteWithSync : Boolean;
+    fTerminateWithSync : Boolean;
+    procedure DoExecute;
+    procedure CallToTerminate;
+  protected
+    procedure DoTerminate; override;
+  public
+    constructor Create(aProc : TProc; aSynchronize : Boolean);
+    procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
+    procedure Execute; override;
+  end;
+
   IAnonymousThread = interface
     procedure Start;
     function OnTerminate(aProc : TProc) : IAnonymousThread;
+    function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
   end;
 
   TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
   private
-    fThread : TThread;
-    fTerminateProc : TProc;
-    constructor Create(aProc : TProc);
-    procedure NotifyTerminate(Sender : TObject);
+    fThread : TAdvThread;
+    constructor Create(aProc : TProc; aSynchronize : Boolean);
   public
-    class function Execute(aProc : TProc) : IAnonymousThread;
+    class function Execute(aProc : TProc) : IAnonymousThread; overload;
+    class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
     procedure Start;
-    function OnTerminate(aProc : TProc) : IAnonymousThread;
+    function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
+    function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
   end;
 
   TParamArray = array of TFlexValue;
@@ -201,8 +218,11 @@ type
   IScheduledTask = interface(ITask)
   ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
     function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
+    function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
     function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
+    function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
     function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
+    function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
     function CheckSchedule : Boolean;
     procedure DoExpire;
     function GetTaskName : string;
@@ -228,6 +248,9 @@ type
     fTaskStatus : TWorkTaskStatus;
     fOwnedParams : Boolean;
     fEnabled : Boolean;
+    fExecuteWithSync : Boolean;
+    fExceptionWithSync : Boolean;
+    fTerminateWithSync : Boolean;
     function GetParam(aIndex : Integer) : TFlexValue;
     procedure DoExecute;
     procedure DoException(aException : Exception);
@@ -236,6 +259,10 @@ type
     procedure SetNumWorker(Value : Integer);
     function GetIdTask : Int64;
     procedure SetIdTask(Value : Int64);
+  protected
+    property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
+    property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
+    property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
   public
     constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
     destructor Destroy; override;
@@ -267,15 +294,21 @@ type
     fExpirationDate : TDateTime;
     fExpirationTimes : Integer;
     fFinished : Boolean;
+    fExpireWithSync: Boolean;
     procedure ClearSchedule;
     function CheckSchedule : Boolean;
     procedure DoExpire;
     function GetTaskName : string;
+  protected
+    property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
   public
     property Name : string read fName write fName;
     function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
+    function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
     function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
+    function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
     function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
+    function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
     function StartAt(aStartDate : TDateTime) : IScheduledTask;
     procedure RunOnce;
     procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
@@ -291,9 +324,12 @@ type
   private
     fNumWorker : Integer;
     fCurrentIdTask : Integer;
-    fCurrentTask : IWorkTask;
     fStatus : TWorkerStatus;
     fTaskQueue : TTaskQueue;
+    procedure ExecuteTask;
+    procedure TerminateTask;
+  protected
+    fCurrentTask : ITask;
   public
     constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
     property NumWorker : Integer read fNumWorker;
@@ -303,7 +339,7 @@ type
 
   TScheduledWorker = class(TWorker)
   private
-    fTask : IScheduledTask;
+    procedure ExpireTask;
   public
     constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
     procedure Execute; override;
@@ -331,7 +367,9 @@ type
     property NumPushedTasks : Int64 read fNumPushedTasks;
     property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
     function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
+    function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
     function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
+    function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
     procedure Start;
     procedure CancelAll;
   end;
@@ -344,6 +382,7 @@ type
     fCondVar : TSimpleEvent;
     fTaskList : TScheduledTaskList;
     fRemoveTaskAfterExpiration : Boolean;
+    procedure ExpireTask;
   public
     constructor Create(aTaskList : TScheduledTaskList);
     destructor Destroy; override;
@@ -368,7 +407,9 @@ type
     property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
     property IsStarted : Boolean read fIsStarted;
     function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
+    function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
     function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
+    function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
     function GetTask(aIdTask : Int64) : IScheduledTask; overload;
     function GetTask(const aTaskName : string) : IScheduledTask; overload;
     procedure Start;
@@ -837,30 +878,31 @@ end;
 
 { TAnonymousThread }
 
-constructor TAnonymousThread.Create(aProc : TProc);
+constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
 begin
-  {$IFNDEF FPC}
-  fThread := TThread.CreateAnonymousThread(aProc);
-  {$ELSE}
-  fThread := TThread.CreateAnonymousThread(@aProc);
-  {$ENDIF}
+  fThread := TAdvThread.Create(aProc,aSynchronize);
 end;
 
 class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
 begin
-  Result := TAnonymousThread.Create(aProc);
+  Result := TAnonymousThread.Create(aProc,False);
 end;
 
-procedure TAnonymousThread.NotifyTerminate(Sender: TObject);
+class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
 begin
-  fTerminateProc;
+  Result := TAnonymousThread.Create(aProc,True);
 end;
 
 function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
 begin
   Result := Self;
-  fTerminateProc := aProc;
-  fThread.OnTerminate := Self.NotifyTerminate;
+  fThread.OnTerminate(aProc,False);
+end;
+
+function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
+begin
+  Result := Self;
+  fThread.OnTerminate(aProc,True);
 end;
 
 procedure TAnonymousThread.Start;
@@ -875,6 +917,9 @@ var
   i : Integer;
 begin
   fTaskStatus := TWorkTaskStatus.wtsPending;
+  fExecuteWithSync := False;
+  fTerminateWithSync := False;
+  fExceptionWithSync := False;
   fOwnedParams := aOwnedParams;
   SetLength(fParamArray,High(aParamArray)+1);
   for i := Low(aParamArray) to High(aParamArray) do
@@ -1028,6 +1073,17 @@ begin
   end;
 end;
 
+function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
+begin
+  Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
+  TTask(Result).ExecuteWithSync := True;
+end;
+
+function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
+begin
+  Result := AddTask_Sync([],False,aTaskProc);
+end;
+
 procedure TBackgroundTasks.Start;
 var
   i : Integer;
@@ -1055,6 +1111,16 @@ begin
   FreeOnTerminate := False;
 end;
 
+procedure TWorker.ExecuteTask;
+begin
+  fCurrentTask.DoExecute;
+end;
+
+procedure TWorker.TerminateTask;
+begin
+  fCurrentTask.DoTerminate;
+end;
+
 procedure TWorker.Execute;
 begin
   fStatus := TWorkerStatus.wsIdle;
@@ -1066,7 +1132,8 @@ begin
       fStatus := TWorkerStatus.wsWorking;
       try
         fCurrentIdTask := fCurrentTask.GetIdTask;
-        fCurrentTask.DoExecute;
+        if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
+          else fCurrentTask.DoExecute;
       except
         on E : Exception do
         begin
@@ -1075,7 +1142,8 @@ begin
         end;
       end;
     finally
-      fCurrentTask.DoTerminate;
+      if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
+        else fCurrentTask.DoTerminate;
       fStatus := TWorkerStatus.wsIdle;
     end;
   end;
@@ -1101,6 +1169,17 @@ begin
   Result := scheduletask;
 end;
 
+function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
+begin
+  Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
+  TTask(Result).ExecuteWithSync := True;
+end;
+
+function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
+begin
+  Result := AddTask_Sync(aTaskName,aTaskProc);
+end;
+
 constructor TScheduledTasks.Create;
 begin
   fNumPushedTasks := 0;
@@ -1284,52 +1363,84 @@ begin
   Result := Self;
 end;
 
+function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
+begin
+  Result := OnException(aTaskProc);
+  TTask(Result).ExceptionWithSync := True;
+end;
+
 function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
 begin
   fExpiredProc := aTaskProc;
   Result := Self;
 end;
 
+function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
+begin
+  Result := OnExpired(aTaskProc);
+  TScheduledTask(Result).ExpireWithSync := True;
+end;
+
 function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
 begin
   fTerminateProc := aTaskProc;
   Result := Self;
 end;
 
+function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
+begin
+  Result := OnTerminated(aTaskProc);
+  TTask(Result).TerminateWithSync := True;
+end;
+
 { TScheduledWorker }
 
 constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
 begin
   inherited Create(aNumWorker,nil);
+  NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
   FreeOnTerminate := True;
-  fTask := aScheduledTask;
+  fCurrentTask := aScheduledTask;
 end;
 
 procedure TScheduledWorker.Execute;
 begin
   fStatus := TWorkerStatus.wsIdle;
-  if Assigned(fTask) then
+  if Assigned(fCurrentTask) then
   begin
     try
       fStatus := TWorkerStatus.wsWorking;
       try
-        fTask.DoExecute;
+        if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
+          else fCurrentTask.DoExecute;
         fStatus := TWorkerStatus.wsIdle;
       except
         on E : Exception do
         begin
-          if fTask <> nil then fTask.DoException(E)
+          if fCurrentTask <> nil then fCurrentTask.DoException(E)
             else raise Exception.Create(e.Message);
         end;
       end;
     finally
-      fTask.DoTerminate;
+      if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
+        else fCurrentTask.DoTerminate;
+      //check if expired
+      if (fCurrentTask as IScheduledTask).IsFinished then
+      begin
+        if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
+          else (fCurrentTask as IScheduledTask).DoExpire;
+      end;
     end;
   end;
-  fTask := nil;
+  fCurrentTask := nil;
   fStatus := TWorkerStatus.wsSuspended;
 end;
 
+procedure TScheduledWorker.ExpireTask;
+begin
+  (fCurrentTask as IScheduledTask).DoExpire;
+end;
+
 { TScheduler }
 
 constructor TScheduler.Create(aTaskList : TScheduledTaskList);
@@ -1381,7 +1492,8 @@ begin
         begin
           if task.IsEnabled then
           begin
-            task.DoExpire;
+            //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
+            //  else task.DoExpire;
             if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
           end;
         end;
@@ -1394,6 +1506,11 @@ begin
   end;
 end;
 
+procedure TScheduler.ExpireTask;
+begin
+
+end;
+
 function TScheduler.Add(aTask: TScheduledTask): Integer;
 begin
   Result := fTaskList.Add(aTask);
@@ -1429,4 +1546,43 @@ begin
   end;
 end;
 
+{ TAdvThread }
+
+constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
+begin
+  inherited Create(True);
+  FreeOnTerminate := True;
+  fExecuteWithSync := aSynchronize;
+  fExecuteProc := aProc;
+end;
+
+procedure TAdvThread.DoExecute;
+begin
+  if Assigned(fExecuteProc) then fExecuteProc;
+end;
+
+procedure TAdvThread.CallToTerminate;
+begin
+  if Assigned(fTerminateProc) then fTerminateProc;
+end;
+
+procedure TAdvThread.DoTerminate;
+begin
+  if fTerminateWithSync then Synchronize(CallToTerminate)
+    else CallToTerminate;
+end;
+
+procedure TAdvThread.Execute;
+begin
+  if fExecuteWithSync then Synchronize(DoExecute)
+    else DoExecute;
+end;
+
+
+procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
+begin
+  fTerminateWithSync := aSynchronize;
+  fTerminateProc := aProc;
+end;
+
 end.

+ 8 - 2
Quick.Value.pas

@@ -7,7 +7,7 @@
   Author      : Kike Pérez
   Version     : 1.4
   Created     : 07/01/2019
-  Modified    : 15/01/2019
+  Modified    : 16/01/2019
 
   This file is part of QuickLib: https://github.com/exilon/QuickLib
 
@@ -176,7 +176,11 @@ type
 
   TFlexValue = record
   private
+    {$IFNDEF FPC}
     fDataIntf : IInterface;
+    {$ELSE}
+    fDataIntf : TValueData;
+    {$ENDIF}
     fDataType : TValueDataType;
     function CastToString : string;
     {$IFNDEF NEXTGEN}
@@ -533,7 +537,9 @@ begin
     vtPointer : AsPointer := Value.VPointer;
     else raise Exception.Create('DataType not supported by TFlexValue');
   end;
-  //fDataIntf._AddRef;
+  {$IFDEF FPC}
+  fDataIntf._AddRef;
+  {$ENDIF}
 end;
 
 {$IFNDEF FPC}