Просмотр исходного кода

TBackgroundTasks & TScheduledTasks added

Unknown 6 лет назад
Родитель
Сommit
f51d4f4856
1 измененных файлов с 806 добавлено и 65 удалено
  1. 806 65
      Quick.Threads.pas

+ 806 - 65
Quick.Threads.pas

@@ -1,13 +1,13 @@
 { ***************************************************************************
 { ***************************************************************************
 
 
-  Copyright (c) 2016-2018 Kike Pérez
+  Copyright (c) 2016-2019 Kike Pérez
 
 
   Unit        : Quick.Threads
   Unit        : Quick.Threads
   Description : Thread safe collections
   Description : Thread safe collections
   Author      : Kike Pérez
   Author      : Kike Pérez
-  Version     : 1.2
+  Version     : 1.4
   Created     : 09/03/2018
   Created     : 09/03/2018
-  Modified    : 19/12/2018
+  Modified    : 14/01/2019
 
 
   This file is part of QuickLib: https://github.com/exilon/QuickLib
   This file is part of QuickLib: https://github.com/exilon/QuickLib
 
 
@@ -35,9 +35,12 @@ interface
 
 
 uses
 uses
   Classes,
   Classes,
+  //rtti,
   Types,
   Types,
   SysUtils,
   SysUtils,
+  DateUtils,
   //Quick.Chrono,
   //Quick.Chrono,
+  Quick.Value,
   {$IFNDEF FPC}
   {$IFNDEF FPC}
   System.RTLConsts,
   System.RTLConsts,
   System.Generics.Collections,
   System.Generics.Collections,
@@ -66,7 +69,6 @@ type
   public
   public
     constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
     constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
     destructor Destroy; override;
     destructor Destroy; override;
-
     procedure Grow(ADelta: Integer);
     procedure Grow(ADelta: Integer);
     function PushItem(const AItem: T): TWaitResult; overload;
     function PushItem(const AItem: T): TWaitResult; overload;
     function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
     function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
@@ -75,7 +77,7 @@ type
     function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
     function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
     function PopItem(var AItem: T): TWaitResult; overload;
     function PopItem(var AItem: T): TWaitResult; overload;
     procedure DoShutDown;
     procedure DoShutDown;
-
+    procedure Clear;
     property QueueSize: Integer read FQueueSize;
     property QueueSize: Integer read FQueueSize;
     property ShutDown: Boolean read FShutDown;
     property ShutDown: Boolean read FShutDown;
     property TotalItemsPushed: Cardinal read FTotalItemsPushed;
     property TotalItemsPushed: Cardinal read FTotalItemsPushed;
@@ -100,7 +102,6 @@ type
   public
   public
     constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
     constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
     destructor Destroy; override;
     destructor Destroy; override;
-
     procedure Grow(ADelta: Integer);
     procedure Grow(ADelta: Integer);
     function PushItem(const AItem: T): TWaitResult; overload;
     function PushItem(const AItem: T): TWaitResult; overload;
     function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
     function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
@@ -109,32 +110,12 @@ type
     function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
     function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
     function PopItem(var AItem: T): TWaitResult; overload;
     function PopItem(var AItem: T): TWaitResult; overload;
     procedure DoShutDown;
     procedure DoShutDown;
-
     property QueueSize: Integer read FQueueSize;
     property QueueSize: Integer read FQueueSize;
     property ShutDown: Boolean read FShutDown;
     property ShutDown: Boolean read FShutDown;
     property TotalItemsPushed: Cardinal read FTotalItemsPushed;
     property TotalItemsPushed: Cardinal read FTotalItemsPushed;
     property TotalItemsPopped: Cardinal read FTotalItemsPopped;
     property TotalItemsPopped: Cardinal read FTotalItemsPopped;
   end;
   end;
 
 
-  TThreadTask<T> = class(TThread)
-  private
-    fMaxQueue : Integer;
-    fInsertTimeout : Integer;
-    fExtractTimeout : Integer;
-    fTaskQueue : TThreadedQueueCS<T>;
-    function GetTaskQueue : Cardinal;
-  public
-    constructor Create;
-    destructor Destroy; override;
-    property MaxQueue : Integer read fMaxQueue write fMaxQueue;
-    property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout;
-    property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout;
-    property TaskQueue : Cardinal read GetTaskQueue;
-    procedure Execute; override;
-    function AddTask(Task : T) : Boolean;
-    procedure Start;
-  end;
-
   {$IFNDEF FPC}
   {$IFNDEF FPC}
   TThreadObjectList<T: class> = class(TList<T>)
   TThreadObjectList<T: class> = class(TList<T>)
     private
     private
@@ -178,10 +159,247 @@ type
     function OnTerminate(aProc : TProc) : IAnonymousThread;
     function OnTerminate(aProc : TProc) : IAnonymousThread;
   end;
   end;
 
 
+  TParamArray = array of TFlexValue;
+
+  TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
+
+  TScheduleMode = (smRunOnce, smRepeatMode);
+
+  TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
+
+  ITask = interface
+  ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
+    function GetParam(aIndex : Integer) : TFlexValue;
+    function TaskStatus : TWorkTaskStatus;
+    function GetNumWorker : Integer;
+    procedure SetNumWorker(Value : Integer);
+    function GetIdTask : Int64;
+    procedure SetIdTask(Value : Int64);
+    procedure DoExecute;
+    procedure DoException(aException : Exception);
+    procedure DoTerminate;
+    property Param[index : Integer] : TFlexValue read GetParam;
+    property NumWorker : Integer read GetNumWorker write SetNumWorker;
+    property IdTask : Int64 read GetIdTask;
+    function IsEnabled : Boolean;
+  end;
+
+  {$IFNDEF FPC}
+  TTaskProc = reference to procedure(task : ITask);
+  TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
+  {$ELSE}
+  TTaskProc = procedure(task : ITask) of object;
+  TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
+  {$ENDIF}
+
+  IWorkTask = interface(ITask)
+    function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
+    function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
+    procedure Run;
+  end;
+
+  IScheduledTask = interface(ITask)
+  ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
+    function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
+    function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
+    function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
+    function CheckSchedule : Boolean;
+    procedure DoExpire;
+    function GetTaskName : string;
+    function StartAt(aStartDate : TDateTime) : IScheduledTask;
+    procedure RunOnce;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
+    function IsFinished : Boolean;
+    procedure Cancel;
+    property Name : string read GetTaskName;
+  end;
+
+  TTask = class(TInterfacedObject,ITask)
+  private
+    fIdTask : Int64;
+    fNumWorker : Integer;
+    fParamArray : TParamArray;
+    fExecuteProc : TTaskProc;
+    fExceptProc : TTaskExceptionProc;
+    fTerminateProc : TTaskProc;
+    fExpiredProc : TTaskProc;
+    fTaskStatus : TWorkTaskStatus;
+    fOwnedParams : Boolean;
+    fEnabled : Boolean;
+    function GetParam(aIndex : Integer) : TFlexValue;
+    procedure DoExecute;
+    procedure DoException(aException : Exception);
+    procedure DoTerminate;
+    function GetNumWorker : Integer;
+    procedure SetNumWorker(Value : Integer);
+    function GetIdTask : Int64;
+    procedure SetIdTask(Value : Int64);
+  public
+    constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
+    destructor Destroy; override;
+    property IdTask : Int64 read GetIdTask;
+    property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
+    function IsEnabled : Boolean;
+    function TaskStatus : TWorkTaskStatus;
+  end;
+
+  TWorkTask = class(TTask,IWorkTask)
+  public
+    function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
+    function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
+    procedure Run; virtual;
+  end;
+
+  TTaskQueue = TThreadedQueueCS<IWorkTask>;
+
+  TScheduledTask = class(TTask,IScheduledTask)
+  private
+    fName : string;
+    fExecutionTimes : Integer;
+    fScheduleMode : TScheduleMode;
+    fTimeInterval : Integer;
+    fTimeMeasure : TTimeMeasure;
+    fStartDate : TDateTime;
+    fLastExecution : TDateTime;
+    fNextExecution : TDateTime;
+    fExpirationDate : TDateTime;
+    fExpirationTimes : Integer;
+    fFinished : Boolean;
+    procedure ClearSchedule;
+    function CheckSchedule : Boolean;
+    procedure DoExpire;
+    function GetTaskName : string;
+  public
+    property Name : string read fName write fName;
+    function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
+    function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
+    function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
+    function StartAt(aStartDate : TDateTime) : IScheduledTask;
+    procedure RunOnce;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
+    procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
+    function IsFinished : Boolean;
+    procedure Cancel;
+  end;
+
+  TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
+
+  TWorker = class(TThread)
+  private
+    fNumWorker : Integer;
+    fCurrentIdTask : Integer;
+    fCurrentTask : IWorkTask;
+    fStatus : TWorkerStatus;
+    fTaskQueue : TTaskQueue;
+  public
+    constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
+    property NumWorker : Integer read fNumWorker;
+    property Status : TWorkerStatus read fStatus;
+    procedure Execute; override;
+  end;
+
+  TScheduledWorker = class(TWorker)
+  private
+    fTask : IScheduledTask;
+  public
+    constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
+    procedure Execute; override;
+  end;
+
+  TWorkerPool = TObjectList<TWorker>;
+
+  TBackgroundTasks = class
+  private
+    fMaxQueue : Integer;
+    fWorkerPool : TWorkerPool;
+    fConcurrentWorkers : Integer;
+    fInsertTimeout : Cardinal;
+    fExtractTimeout : Cardinal;
+    fTaskQueue : TTaskQueue;
+    fNumPushedTasks : Int64;
+    function GetTaskQueue : Cardinal;
+  public
+    constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
+    destructor Destroy; override;
+    property MaxQueue : Integer read fMaxQueue;
+    property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
+    property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
+    property TaskQueued : Cardinal read GetTaskQueue;
+    property NumPushedTasks : Int64 read fNumPushedTasks;
+    property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
+    function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
+    function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
+    procedure Start;
+    procedure CancelAll;
+  end;
+
+  TScheduledTaskList = TList<IScheduledTask>;
+
+  TScheduler = class(TThread)
+  private
+    fListLock : TCriticalSection;
+    fCondVar : TSimpleEvent;
+    fTaskList : TScheduledTaskList;
+    fRemoveTaskAfterExpiration : Boolean;
+  public
+    constructor Create(aTaskList : TScheduledTaskList);
+    destructor Destroy; override;
+    property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
+    procedure Execute; override;
+    function Add(aTask : TScheduledTask) : Integer;
+    function Get(aIdTask : Int64) : IScheduledTask; overload;
+    function Get(const aTaskName : string) : IScheduledTask; overload;
+  end;
+
+  TScheduledTasks = class
+  private
+    fTaskList : TScheduledTaskList;
+    fScheduler : TScheduler;
+    fNumPushedTasks : Int64;
+    fRemoveTaskAfterExpiration : Boolean;
+    fIsStarted : Boolean;
+  public
+    constructor Create;
+    destructor Destroy; override;
+    property NumPushedTasks : Int64 read fNumPushedTasks;
+    property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
+    property IsStarted : Boolean read fIsStarted;
+    function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
+    function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
+    function GetTask(aIdTask : Int64) : IScheduledTask; overload;
+    function GetTask(const aTaskName : string) : IScheduledTask; overload;
+    procedure Start;
+    procedure Stop;
+  end;
+
 implementation
 implementation
 
 
 { TThreadedQueueCS<T> }
 { TThreadedQueueCS<T> }
 
 
+procedure TThreadedQueueCS<T>.Clear;
+var
+  obj : T;
+begin
+  FQueueLock.Enter;
+  try
+    for obj in FQueue do
+    begin
+      if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
+    end;
+    SetLength(FQueue,0);
+  finally
+    FQueueLock.Leave;
+  end;
+  {$IFDEF FPC}
+  FQueueCondVar.SetEvent;
+  {$ELSE}
+  FQueueCondVar.ReleaseAll;
+  {$ENDIF}
+end;
+
 constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
 constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
 begin
 begin
   inherited Create;
   inherited Create;
@@ -526,44 +744,6 @@ begin
   {$ENDIF}
   {$ENDIF}
 end;
 end;
 
 
-{ TThreadTask<T> }
-
-function TThreadTask<T>.AddTask(Task: T): Boolean;
-begin
-  Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled;
-end;
-
-constructor TThreadTask<T>.Create;
-begin
-  inherited Create(True);
-  fMaxQueue := 10;
-  fInsertTimeout := INFINITE;
-  fExtractTimeout := INFINITE;
-end;
-
-destructor TThreadTask<T>.Destroy;
-begin
-  if Assigned(fTaskQueue) then fTaskQueue.Free;
-  inherited;
-end;
-
-procedure TThreadTask<T>.Execute;
-begin
-  inherited;
-
-end;
-
-function TThreadTask<T>.GetTaskQueue: Cardinal;
-begin
-  if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize
-    else Result := 0;
-end;
-
-procedure TThreadTask<T>.Start;
-begin
-  fTaskQueue := TThreadedQueueCS<T>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
-end;
-
 {$IFNDEF FPC}
 {$IFNDEF FPC}
 { TThreadObjectList<T> }
 { TThreadObjectList<T> }
 
 
@@ -655,7 +835,7 @@ begin
 end;
 end;
 {$ENDIF}
 {$ENDIF}
 
 
-{ TThreadEx }
+{ TAnonymousThread }
 
 
 constructor TAnonymousThread.Create(aProc : TProc);
 constructor TAnonymousThread.Create(aProc : TProc);
 begin
 begin
@@ -688,4 +868,565 @@ begin
   fThread.Start;
   fThread.Start;
 end;
 end;
 
 
+{ TTask }
+
+constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
+var
+  i : Integer;
+begin
+  fTaskStatus := TWorkTaskStatus.wtsPending;
+  fOwnedParams := aOwnedParams;
+  SetLength(fParamArray,High(aParamArray)+1);
+  for i := Low(aParamArray) to High(aParamArray) do
+  begin
+    fParamArray[i].Create(aParamArray[i]);
+    {$IFDEF FPC}
+    fParamArray[i]._AddRef;
+    {$ENDIF}
+  end;
+  fExecuteProc := aTaskProc;
+  fEnabled := False;
+end;
+
+destructor TTask.Destroy;
+var
+  i : Integer;
+begin
+  //free passed params
+  if fOwnedParams then
+  begin
+    for i := Low(fParamArray) to High(fParamArray) do
+    begin
+      {$IFDEF FPC}
+      fParamArray[i]._Release;
+      {$ENDIF}
+      if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
+    end;
+  end;
+  fParamArray := nil;
+  inherited;
+end;
+
+procedure TTask.DoException(aException : Exception);
+begin
+  fTaskStatus := TWorkTaskStatus.wtsException;
+  if Assigned(fExceptProc) then fExceptProc(Self,aException)
+    else raise aException;
+end;
+
+procedure TTask.DoExecute;
+begin
+  fTaskStatus := TWorkTaskStatus.wtsRunning;
+  if Assigned(fExecuteProc) then fExecuteProc(Self);
+  fTaskStatus := TWorkTaskStatus.wtsDone;
+end;
+
+procedure TTask.DoTerminate;
+begin
+  if Assigned(fTerminateProc) then fTerminateProc(Self);
+end;
+
+function TTask.GetIdTask: Int64;
+begin
+  Result := fIdTask;
+end;
+
+procedure TTask.SetIdTask(Value : Int64);
+begin
+  fIdTask := Value;
+end;
+
+function TTask.GetNumWorker: Integer;
+begin
+  Result := fNumWorker;
+end;
+
+function TTask.GetParam(aIndex: Integer): TFlexValue;
+begin
+  Result := fParamArray[aIndex];
+end;
+
+function TTask.IsEnabled: Boolean;
+begin
+  Result := fEnabled;
+end;
+
+procedure TTask.SetNumWorker(Value: Integer);
+begin
+  fTaskStatus := TWorkTaskStatus.wtsAssigned;
+  fNumWorker := Value;
+end;
+
+function TTask.TaskStatus: TWorkTaskStatus;
+begin
+  Result := fTaskStatus;
+end;
+
+{ TWorkTask }
+
+function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
+begin
+  fExceptProc := aTaskProc;
+  Result := Self;
+end;
+
+function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
+begin
+  fTerminateProc := aTaskProc;
+  Result := Self;
+end;
+
+procedure TWorkTask.Run;
+begin
+  fEnabled := True;
+end;
+
+{ TBackgroundTasks }
+
+procedure TBackgroundTasks.CancelAll;
+begin
+  fTaskQueue.Clear;
+end;
+
+constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
+begin
+  fMaxQueue := aMaxQueue;
+  fConcurrentWorkers := aConcurrentWorkers;
+  fInsertTimeout := INFINITE;
+  fExtractTimeout := 5000;
+  fNumPushedTasks := 0;
+  fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
+end;
+
+destructor TBackgroundTasks.Destroy;
+begin
+  if Assigned(fWorkerPool) then fWorkerPool.Free;
+  if Assigned(fTaskQueue) then fTaskQueue.Free;
+  inherited;
+end;
+
+function TBackgroundTasks.GetTaskQueue: Cardinal;
+begin
+  Result := fTaskQueue.QueueSize;
+end;
+
+function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
+begin
+  Result := AddTask([],False,aTaskProc);
+end;
+
+function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
+var
+  worktask : IWorkTask;
+begin
+  worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
+  Inc(fNumPushedTasks);
+  worktask.SetIdTask(fNumPushedTasks);
+  if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
+  begin
+    Result := worktask;
+  end;
+end;
+
+procedure TBackgroundTasks.Start;
+var
+  i : Integer;
+  worker : TWorker;
+begin
+  //create workers
+  if fWorkerPool <> nil then fWorkerPool.Free;
+  fWorkerPool := TWorkerPool.Create(True);
+  for i := 1 to fConcurrentWorkers do
+  begin
+    worker := TWorker.Create(i,fTaskQueue);
+    fWorkerPool.Add(worker);
+    worker.Start;
+  end;
+end;
+
+{ TWorker }
+
+constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
+begin
+  inherited Create(True);
+  fNumWorker := aNumWorker;
+  fStatus := TWorkerStatus.wsSuspended;
+  fTaskQueue := aTaskQueue;
+  FreeOnTerminate := False;
+end;
+
+procedure TWorker.Execute;
+begin
+  fStatus := TWorkerStatus.wsIdle;
+  while (not Terminated) and (fTaskQueue.QueueSize > 0) do
+  begin
+    fCurrentTask := fTaskQueue.PopItem;
+    if fCurrentTask <> nil then
+    try
+      fStatus := TWorkerStatus.wsWorking;
+      try
+        fCurrentIdTask := fCurrentTask.GetIdTask;
+        fCurrentTask.DoExecute;
+      except
+        on E : Exception do
+        begin
+          if fCurrentTask <> nil then fCurrentTask.DoException(E)
+            else raise Exception.Create(e.Message);
+        end;
+      end;
+    finally
+      fCurrentTask.DoTerminate;
+      fStatus := TWorkerStatus.wsIdle;
+    end;
+  end;
+  fStatus := TWorkerStatus.wsSuspended
+end;
+
+{ TScheduledTasks }
+
+function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
+begin
+  Result := AddTask(aTaskName,[],False,aTaskProc);
+end;
+
+function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
+var
+  scheduletask : TScheduledTask;
+begin
+  scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
+  scheduletask.Name := aTaskName;
+  Inc(fNumPushedTasks);
+  scheduletask.SetIdTask(fNumPushedTasks);
+  fTaskList.Add(scheduletask);
+  Result := scheduletask;
+end;
+
+constructor TScheduledTasks.Create;
+begin
+  fNumPushedTasks := 0;
+  fIsStarted := False;
+  fTaskList := TScheduledTaskList.Create;
+end;
+
+destructor TScheduledTasks.Destroy;
+begin
+  if Assigned(fScheduler) then
+  begin
+    fScheduler.Terminate;
+    fScheduler.WaitFor;
+    fScheduler.Free;
+  end;
+  if Assigned(fTaskList) then fTaskList.Free;
+  inherited;
+end;
+
+function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
+begin
+  Result := fScheduler.Get(aIdTask);
+end;
+
+function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
+begin
+  Result := fScheduler.Get(aTaskName);
+end;
+
+procedure TScheduledTasks.Start;
+begin
+  if fIsStarted then Exit;
+  if not Assigned(fScheduler) then
+  begin
+    fScheduler := TScheduler.Create(fTaskList);
+    fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
+  end;
+  fScheduler.Start;
+  fIsStarted := True;
+end;
+
+procedure TScheduledTasks.Stop;
+begin
+  if Assigned(fScheduler) then fScheduler.Terminate;
+  fIsStarted := False;
+end;
+
+{ TScheduledTask }
+
+function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
+begin
+  Result := Self;
+  ClearSchedule;
+  fScheduleMode := TScheduleMode.smRunOnce;
+  fStartDate := aStartDate;
+  fNextExecution := aStartDate;
+end;
+
+procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
+begin
+  if fStartDate = 0.0 then ClearSchedule;
+  fScheduleMode := TScheduleMode.smRepeatMode;
+  fTimeMeasure := aTimeMeasure;
+  fTimeInterval := aInterval;
+  if fStartDate = 0.0 then fStartDate := Now();
+  fNextExecution := fStartDate;
+  fEnabled := True;
+end;
+
+procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
+begin
+  if fStartDate = 0.0 then ClearSchedule;
+  fScheduleMode := TScheduleMode.smRepeatMode;
+  fTimeMeasure := aTimeMeasure;
+  fTimeInterval := aInterval;
+  if fStartDate = 0.0 then fStartDate := Now();
+  fExpirationDate := aEndTime;
+  fNextExecution := fStartDate;
+  fEnabled := True;
+end;
+
+procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
+begin
+  if fStartDate = 0.0 then ClearSchedule;
+  fScheduleMode := TScheduleMode.smRepeatMode;
+  fTimeMeasure := aTimeMeasure;
+  fTimeInterval := aInterval;
+  if fStartDate = 0.0 then fStartDate := Now();
+  fExpirationTimes := aRepeatTimes;
+  fNextExecution := fStartDate;
+  fEnabled := True;
+end;
+
+procedure TScheduledTask.RunOnce;
+begin
+  ClearSchedule;
+  fScheduleMode := TScheduleMode.smRunOnce;
+  if fStartDate = 0.0 then fStartDate := Now();
+  fNextExecution := fStartDate;
+  fEnabled := True;
+end;
+
+procedure TScheduledTask.Cancel;
+begin
+  fFinished := True;
+end;
+
+function TScheduledTask.CheckSchedule: Boolean;
+begin
+  Result := False;
+  if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
+
+  if fScheduleMode = TScheduleMode.smRunOnce then
+  begin
+    //if start date reached
+    if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
+    begin
+      fLastExecution := Now();
+      Inc(fExecutionTimes);
+      Result := True;
+    end;
+  end
+  else
+  begin
+    //if next execution reached
+    if Now() >= fNextExecution then
+    begin
+      //check expiration limits
+      if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
+         ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
+      begin
+        fFinished := True;
+        Exit;
+      end;
+      //calculate next execution
+      case fTimeMeasure of
+        tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
+        tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
+        tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
+        tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
+      end;
+      fLastExecution := Now();
+      Inc(fExecutionTimes);
+      Result := True;
+    end;
+  end;
+end;
+
+procedure TScheduledTask.ClearSchedule;
+begin
+  inherited;
+  fFinished := False;
+  fStartDate := 0.0;
+  fLastExecution := 0.0;
+  fNextExecution := 0.0;
+  fExpirationDate := 0.0;
+  fScheduleMode := TScheduleMode.smRunOnce;
+  fTimeMeasure := TTimeMeasure.tmSeconds;
+  fTimeInterval := 0;
+end;
+
+procedure TScheduledTask.DoExpire;
+begin
+  if Assigned(fExpiredProc) then fExpiredProc(Self);
+  fEnabled := False;
+end;
+
+function TScheduledTask.GetTaskName: string;
+begin
+  Result := fName;
+end;
+
+function TScheduledTask.IsFinished: Boolean;
+begin
+  Result := fFinished;
+end;
+
+function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
+begin
+  fExceptProc := aTaskProc;
+  Result := Self;
+end;
+
+function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
+begin
+  fExpiredProc := aTaskProc;
+  Result := Self;
+end;
+
+function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
+begin
+  fTerminateProc := aTaskProc;
+  Result := Self;
+end;
+
+{ TScheduledWorker }
+
+constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
+begin
+  inherited Create(aNumWorker,nil);
+  FreeOnTerminate := True;
+  fTask := aScheduledTask;
+end;
+
+procedure TScheduledWorker.Execute;
+begin
+  fStatus := TWorkerStatus.wsIdle;
+  if Assigned(fTask) then
+  begin
+    try
+      fStatus := TWorkerStatus.wsWorking;
+      try
+        fTask.DoExecute;
+        fStatus := TWorkerStatus.wsIdle;
+      except
+        on E : Exception do
+        begin
+          if fTask <> nil then fTask.DoException(E)
+            else raise Exception.Create(e.Message);
+        end;
+      end;
+    finally
+      fTask.DoTerminate;
+    end;
+  end;
+  fTask := nil;
+  fStatus := TWorkerStatus.wsSuspended;
+end;
+
+{ TScheduler }
+
+constructor TScheduler.Create(aTaskList : TScheduledTaskList);
+begin
+  inherited Create(True);
+  FreeOnTerminate := False;
+  fListLock := TCriticalSection.Create;
+  fRemoveTaskAfterExpiration := False;
+  fTaskList := aTaskList;
+  {$IFDEF FPC}
+  fCondVar := TSimpleEvent.Create;
+  {$ELSE}
+  fCondVar := TSimpleEvent.Create(nil,True,False,'');
+  {$ENDIF}
+end;
+
+destructor TScheduler.Destroy;
+begin
+  fCondVar.SetEvent;
+  fCondVar.Free;
+  fTaskList := nil;
+  fListLock.Free;
+  inherited;
+end;
+
+procedure TScheduler.Execute;
+var
+  task : IScheduledTask;
+  worker : TScheduledWorker;
+  numworker : Int64;
+begin
+  numworker := 0;
+  while not Terminated do
+  begin
+    fListLock.Enter;
+    try
+      for task in fTaskList do
+      begin
+        if not task.IsFinished then
+        begin
+          if task.CheckSchedule then
+          begin
+            Inc(numworker);
+            worker := TScheduledWorker.Create(numworker,task);
+            worker.Start;
+          end;
+        end
+        else
+        begin
+          if task.IsEnabled then
+          begin
+            task.DoExpire;
+            if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
+          end;
+        end;
+      end;
+      task := nil;
+    finally
+      fListLock.Leave;
+    end;
+    fCondVar.WaitFor(250);
+  end;
+end;
+
+function TScheduler.Add(aTask: TScheduledTask): Integer;
+begin
+  Result := fTaskList.Add(aTask);
+end;
+
+function TScheduler.Get(aIdTask: Int64): IScheduledTask;
+var
+  task : IScheduledTask;
+begin
+  fListLock.Enter;
+  try
+    for task in fTaskList do
+    begin
+      if task.IdTask = aIdTask then Exit(task);
+    end;
+  finally
+    fListLock.Leave;
+  end;
+end;
+
+function TScheduler.Get(const aTaskName: string): IScheduledTask;
+var
+  task : IScheduledTask;
+begin
+  fListLock.Enter;
+  try
+    for task in fTaskList do
+    begin
+      if CompareText(task.Name,aTaskName) = 0 then Exit(task);
+    end;
+  finally
+    fListLock.Leave;
+  end;
+end;
+
 end.
 end.