12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279 |
- { ***************************************************************************
- Copyright (c) 2016-2019 Kike Pérez
- Unit : Quick.Threads
- Description : Thread safe collections
- Author : Kike Pérez
- Version : 1.5
- Created : 09/03/2018
- Modified : 07/09/2019
- This file is part of QuickLib: https://github.com/exilon/QuickLib
- ***************************************************************************
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- *************************************************************************** }
- unit Quick.Threads;
- {$i QuickLib.inc}
- interface
- uses
- Classes,
- Types,
- SysUtils,
- DateUtils,
- Quick.Commons,
- //Quick.Chrono,
- Quick.Value,
- Quick.FaultControl,
- {$IFNDEF FPC}
- System.RTLConsts,
- System.Generics.Collections,
- System.SyncObjs;
- {$ELSE}
- RtlConsts,
- Generics.Collections,
- syncobjs;
- {$ENDIF}
- type
- TThreadedQueueCS<T> = class
- private
- FQueue: array of T;
- FQueueSize, FQueueOffset: Integer;
- FQueueLock: TCriticalSection;
- {$IFDEF FPC}
- FQueueCondVar : TEventObject;
- {$ELSE}
- FQueueCondVar: TConditionVariableCS;
- {$ENDIF}
- FShutDown: Boolean;
- FPushTimeout, FPopTimeout: Cardinal;
- FTotalItemsPushed, FTotalItemsPopped: Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- procedure Clear;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- TThreadedQueueList<T> = class
- private
- fQueue : TQueue<T>;
- fQueueSize : Integer;
- fQueueLock : TCriticalSection;
- {$IFDEF FPC}
- FQueueCondVar : TSimpleEvent;
- {$ELSE}
- FQueueCondVar: TConditionVariableCS;
- {$ENDIF}
- fShutDown : Boolean;
- fPushTimeout : Cardinal;
- fPopTimeout : Cardinal;
- fTotalItemsPushed : Cardinal;
- fTotalItemsPopped : Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- {$IFNDEF FPC}
- TThreadObjectList<T: class> = class(TList<T>)
- private
- fList: TObjectList<T>;
- fLock: TObject;
- fDuplicates: TDuplicates;
- function GetItem(aIndex : Integer) : T;
- procedure SetItem(aIndex : Integer; aValue : T);
- public
- constructor Create(OwnedObjects : Boolean);
- destructor Destroy; override;
- property Items[Index : Integer] : T read GetItem write SetItem ; default;
- procedure Add(const Item: T);
- procedure Clear;
- function LockList: TObjectList<T>;
- procedure Remove(const Item: T); inline;
- procedure RemoveItem(const Item: T; Direction: TDirection);
- procedure UnlockList; inline;
- property Duplicates: TDuplicates read fDuplicates write fDuplicates;
- end;
- {$ENDIF}
- {$IFNDEF FPC}
- TAnonExceptionProc = reference to procedure(aException : Exception);
- TAnonProc = TProc;
- {$ELSE}
- TProc = procedure of object;
- TAnonExceptionProc = procedure(aException : Exception) of object;
- {$ENDIF}
- TThreadWorkStatus = (wsRunning, wsDone, wsException);
- TAdvThread = class(TThread)
- private
- fExecuteProc : TProc;
- fExceptionProc : TAnonExceptionProc;
- fTerminateProc : TProc;
- fExecuteWithSync : Boolean;
- fTerminateWithSync : Boolean;
- procedure DoExecute;
- procedure CallToTerminate;
- protected
- procedure DoTerminate; override;
- public
- constructor Create(aProc : TProc; aSynchronize : Boolean);
- procedure OnException(aProc : TAnonExceptionProc);
- procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
- procedure Execute; override;
- end;
- IAnonymousThread = interface
- procedure Start;
- function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
- function OnTerminate(aProc : TProc) : IAnonymousThread;
- function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
- end;
- TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
- private
- fThread : TAdvThread;
- constructor Create(aProc : TProc; aSynchronize : Boolean);
- public
- class function Execute(aProc : TProc) : IAnonymousThread; overload;
- class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
- procedure Start;
- function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
- function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
- function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
- end;
- TParamValue = class
- private
- fName : string;
- fValue : TFlexValue;
- fOwned : Boolean;
- public
- constructor Create; overload;
- constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
- constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
- destructor Destroy; override;
- property Name : string read fName write fName;
- property Value : TFlexValue read fValue write fValue;
- property Owned : Boolean read fOwned write fOwned;
- end;
- TParamList = TObjectList<TParamValue>;
- TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
- TScheduleMode = (smRunOnce, smRepeatMode);
- TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
- ETaskAddError = class(Exception);
- ETaskInitializationError = class(Exception);
- ETaskExecutionError = class(Exception);
- ETaskParamError = class(Exception);
- ETaskSchedulerError = class(Exception);
- ITask = interface
- ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
- function GetParam(aIndex : Integer) : TFlexValue; overload;
- function GetParam(const aName : string) : TFlexValue; overload;
- function GetParam2(aIndex : Integer) : PFlexValue;
- procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
- procedure SetParam(const aName : string; Value : TFlexValue); overload;
- function TaskStatus : TWorkTaskStatus;
- function GetNumWorker : Integer;
- procedure SetNumWorker(Value : Integer);
- function GetIdTask : Int64;
- procedure SetIdTask(Value : Int64);
- function GetResult : TFlexValue;
- procedure SetResult(aValue : TFlexValue);
- procedure DoExecute;
- procedure DoException(aException : Exception);
- procedure DoTerminate;
- {$IFNDEF FPC}
- property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
- property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
- {$ELSE}
- property Param[index : Integer] : TFlexValue read GetParam write SetParam;
- property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
- {$ENDIF}
- property NumWorker : Integer read GetNumWorker write SetNumWorker;
- property Result : TFlexValue read GetResult write SetResult;
- property IdTask : Int64 read GetIdTask;
- function Done : Boolean;
- function Failed : Boolean;
- function NumRetries : Integer;
- function MaxRetries : Integer;
- function LastException : Exception;
- function CircuitBreaked : Boolean;
- function IsEnabled : Boolean;
- end;
- {$IFNDEF FPC}
- TTaskProc = reference to procedure(task : ITask);
- TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
- TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
- {$ELSE}
- TTaskProc = procedure(task : ITask) of object;
- TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
- TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
- {$ENDIF}
- IWorkTask = interface(ITask)
- function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
- function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
- function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
- function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
- function Retry(aMaxRetries : Integer) : IWorkTask;
- function RetryForever : IWorkTask;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
- function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
- function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
- function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
- procedure Run;
- end;
- IScheduledTask = interface(ITask)
- ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
- function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
- function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
- function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
- function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
- function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
- function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
- function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
- function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
- function Retry(aMaxRetries : Integer) : IScheduledTask;
- function RetryForever : IScheduledTask;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
- function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
- function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
- function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
- function CheckSchedule : Boolean;
- procedure DoExpire;
- function GetTaskName : string;
- function StartAt(aStartDate : TDateTime) : IScheduledTask;
- function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- function StartOnDayChange : IScheduledTask;
- function StartNow : IScheduledTask;
- function StartInMinutes(aMinutes : Word) : IScheduledTask;
- function StartInSeconds(aSeconds : Word) : IScheduledTask;
- procedure RunOnce;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
- procedure RepeatEveryDay;
- procedure RepeatEveryWeek;
- function IsFinished : Boolean;
- procedure Cancel;
- property Name : string read GetTaskName;
- function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
- end;
- TTask = class(TInterfacedObject,ITask)
- private
- fIdTask : Int64;
- fNumWorker : Integer;
- fNumRetries : Integer;
- fParamList : TParamList;
- fInitializeProc : TTaskProc;
- fExecuteProc : TTaskProc;
- fExceptProc : TTaskExceptionProc;
- fTerminateProc : TTaskProc;
- fExpiredProc : TTaskProc;
- fTaskStatus : TWorkTaskStatus;
- fOwnedParams : Boolean;
- fEnabled : Boolean;
- fExecuteWithSync : Boolean;
- fExceptionWithSync : Boolean;
- fRetryProc : TTaskRetryProc;
- fTerminateWithSync : Boolean;
- fFaultControl : TFaultControl;
- fCustomFaultPolicy : Boolean;
- fResult : TFlexValue;
- function GetParam(aIndex : Integer) : TFlexValue; overload;
- function GetParam(const aName : string) : TFlexValue; overload;
- function GetParam2(aIndex : Integer) : PFlexValue;
- procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
- procedure SetParam(const aName : string; Value : TFlexValue); overload;
- procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
- procedure DoInitialize;
- procedure DoExecute;
- procedure DoException(aException : Exception);
- procedure DoTerminate;
- function GetNumWorker : Integer;
- procedure SetNumWorker(Value : Integer);
- function GetIdTask : Int64;
- procedure SetIdTask(Value : Int64);
- function GetResult : TFlexValue;
- procedure SetResult(aValue : TFlexValue);
- protected
- property FaultControl : TFaultControl read fFaultControl write fFaultControl;
- property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
- property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
- property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
- property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
- procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
- procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
- procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
- procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
- public
- constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
- destructor Destroy; override;
- property IdTask : Int64 read GetIdTask;
- property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
- function IsEnabled : Boolean;
- function TaskStatus : TWorkTaskStatus;
- function Done : Boolean;
- function Failed : Boolean;
- function NumRetries : Integer;
- function MaxRetries : Integer;
- function LastException : Exception;
- function CircuitBreaked : Boolean;
- end;
- TWorkTask = class(TTask,IWorkTask)
- public
- function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
- function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
- function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
- function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
- function Retry(aMaxRetries : Integer) : IWorkTask;
- function RetryForever : IWorkTask;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
- function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
- function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
- function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
- procedure Run; virtual;
- end;
- TTaskQueue = TThreadedQueueCS<IWorkTask>;
- TScheduledTask = class(TTask,IScheduledTask)
- private
- fName : string;
- fExecutionTimes : Integer;
- fScheduleMode : TScheduleMode;
- fTimeInterval : Integer;
- fTimeMeasure : TTimeMeasure;
- fStartDate : TDateTime;
- fLastExecution : TDateTime;
- fNextExecution : TDateTime;
- fExpirationDate : TDateTime;
- fExpirationTimes : Integer;
- fFinished : Boolean;
- fExpireWithSync: Boolean;
- procedure ClearSchedule;
- function CheckSchedule : Boolean;
- procedure DoExpire;
- function GetTaskName : string;
- function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
- protected
- property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
- public
- property Name : string read fName write fName;
- function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
- property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
- function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
- function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
- function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
- function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function StartAt(aStartDate : TDateTime) : IScheduledTask;
- function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- function StartOnDayChange : IScheduledTask;
- function StartNow : IScheduledTask;
- function StartInMinutes(aMinutes : Word) : IScheduledTask;
- function StartInSeconds(aSeconds : Word) : IScheduledTask;
- procedure RunOnce;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
- procedure RepeatEveryDay;
- procedure RepeatEveryWeek;
- function Retry(aMaxRetries : Integer) : IScheduledTask;
- function RetryForever : IScheduledTask;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
- function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
- function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
- function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
- function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
- function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
- function IsFinished : Boolean;
- procedure Cancel;
- end;
- TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
- TWorker = class(TThread)
- protected
- fStatus : TWorkerStatus;
- fCurrentTask : ITask;
- fDefaultFaultPolicy : TFaultPolicy;
- procedure ExecuteTask;
- procedure TerminateTask;
- public
- constructor Create;
- destructor Destroy; override;
- property Status : TWorkerStatus read fStatus;
- procedure SetFaultPolicy(aTask : TTask);
- procedure Execute; override;
- end;
- TSimpleWorker = class(TWorker)
- public
- constructor Create(aTask : ITask);
- procedure Execute; override;
- end;
- TQueueWorker = class(TWorker)
- private
- fCurrentIdTask : Integer;
- fNumWorker : Integer;
- fTaskQueue : TTaskQueue;
- public
- constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
- property NumWorker : Integer read fNumWorker;
- procedure Execute; override;
- end;
- TScheduledWorker = class(TWorker)
- private
- procedure ExpireTask;
- public
- constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
- procedure Execute; override;
- end;
- TWorkerPool = TObjectList<TWorker>;
- TRunTask = class
- public
- class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
- class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
- class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
- class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
- end;
- TBackgroundTasks = class
- private
- fMaxQueue : Integer;
- fWorkerPool : TWorkerPool;
- fConcurrentWorkers : Integer;
- fInsertTimeout : Cardinal;
- fExtractTimeout : Cardinal;
- fTaskQueue : TTaskQueue;
- fNumPushedTasks : Int64;
- function GetTaskQueue : Cardinal;
- public
- constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
- destructor Destroy; override;
- property MaxQueue : Integer read fMaxQueue write fMaxQueue;
- property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
- property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
- property TaskQueued : Cardinal read GetTaskQueue;
- property NumPushedTasks : Int64 read fNumPushedTasks;
- property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
- function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
- function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
- function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
- function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
- procedure Start;
- procedure CancelAll;
- end;
- TScheduledTaskList = TList<IScheduledTask>;
- TScheduler = class(TThread)
- private
- fListLock : TCriticalSection;
- fCondVar : TSimpleEvent;
- fTaskList : TScheduledTaskList;
- fRemoveTaskAfterExpiration : Boolean;
- public
- constructor Create(aTaskList : TScheduledTaskList);
- destructor Destroy; override;
- property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
- procedure Execute; override;
- function Add(aTask : TScheduledTask) : Integer;
- function Get(aIdTask : Int64) : IScheduledTask; overload;
- function Get(const aTaskName : string) : IScheduledTask; overload;
- end;
- TScheduledTasks = class
- private
- fTaskList : TScheduledTaskList;
- fScheduler : TScheduler;
- fNumPushedTasks : Int64;
- fRemoveTaskAfterExpiration : Boolean;
- fIsStarted : Boolean;
- fFaultPolicy : TFaultPolicy;
- public
- constructor Create;
- destructor Destroy; override;
- property NumPushedTasks : Int64 read fNumPushedTasks;
- property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
- property IsStarted : Boolean read fIsStarted;
- property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
- function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function GetTask(aIdTask : Int64) : IScheduledTask; overload;
- function GetTask(const aTaskName : string) : IScheduledTask; overload;
- procedure Start;
- procedure Stop;
- end;
- implementation
- { TThreadedQueueCS<T> }
- procedure TThreadedQueueCS<T>.Clear;
- var
- obj : T;
- begin
- FQueueLock.Enter;
- try
- for obj in FQueue do
- begin
- if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
- end;
- SetLength(FQueue,0);
- finally
- FQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
- SetLength(FQueue, AQueueDepth);
- FQueueLock := TCriticalSection.Create;
- {$IFDEF FPC}
- FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
- {$ELSE}
- FQueueCondVar := TConditionVariableCS.Create;
- {$ENDIF}
- FPushTimeout := PushTimeout;
- FPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueCS<T>.Destroy;
- begin
- DoShutDown;
- FQueueLock.Free;
- FQueueCondVar.Free;
- inherited;
- end;
- procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
- begin
- FQueueLock.Enter;
- try
- SetLength(FQueue, Length(FQueue) + ADelta);
- finally
- FQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- function TThreadedQueueCS<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
- begin
- {$IFDEF FPC}
- Result := FQueueCondVar.WaitFor(FPopTimeout);
- {$ELSE}
- Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
- {$ENDIF}
- end;
- if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := FQueue[FQueueOffset];
- FQueue[FQueueOffset] := Default(T);
- if FQueueSize = Length(FQueue) then
- begin
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- Dec(FQueueSize);
- Inc(FQueueOffset);
- Inc(FTotalItemsPopped);
- if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- if FQueueSize >= High(FQueue) then
- begin
- if FQueueSize < 512 then Grow(FQueueSize * 2)
- else Grow(10);
- end;
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
- begin
- {$IFDEF FPC}
- Result := FQueueCondVar.WaitFor(FPushTimeout);
- {$ELSE}
- Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
- {$ENDIF}
- end;
- if FShutDown or (Result <> wrSignaled) then Exit;
- if FQueueSize = 0 then
- begin
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
- Inc(FQueueSize);
- Inc(FTotalItemsPushed);
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- procedure TThreadedQueueCS<T>.DoShutDown;
- begin
- FShutDown := True;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- { TThreadedQueueList<T> }
- constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- fQueue := TQueue<T>.Create;
- fQueue.Capacity := AQueueDepth;
- fQueueSize := 0;
- fQueueLock := TCriticalSection.Create;
- {$IFDEF FPC}
- FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
- {$ELSE}
- fQueueCondVar := TConditionVariableCS.Create;
- {$ENDIF}
- fPushTimeout := PushTimeout;
- fPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueList<T>.Destroy;
- begin
- DoShutDown;
- fQueueLock.Free;
- fQueueCondVar.Free;
- fQueue.Free;
- inherited;
- end;
- procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
- begin
- fQueueLock.Enter;
- try
- fQueue.Capacity := fQueue.Capacity + ADelta;
- finally
- fQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- function TThreadedQueueList<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- {$IFDEF FPC}
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- //var
- //crono : TChronometer;
- begin
- AItem := Default(T);
- //crono := TChronometer.Create(False);
- try
- Result := wrSignaled;
- //writeln('popitem');
- //crono.Start;
- while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
- begin
- //crono.Start;
- Result := FQueueCondVar.WaitFor(FPopTimeout);
- //crono.Stop;
- //writeln('in: ' + crono.ElapsedTime);
- //if result = twaitresult.wrError then result := twaitresult.wrError;
- end;
- //crono.Stop;
- //writeln('out: ' + crono.ElapsedTime);
- fQueueLock.Enter;
- try
- if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := fQueue.Extract;
- Dec(FQueueSize);
- Inc(fTotalItemsPopped);
- finally
- fQueueLock.Leave;
- end;
- finally
- AQueueSize := fQueueSize;
- end;
- end;
- {$ELSE}
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- fQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
- begin
- Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
- end;
- if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := fQueue.Extract;
- if fQueueSize = fQueue.Count then
- begin
- FQueueCondVar.ReleaseAll;
- end;
- Dec(FQueueSize);
- Inc(fTotalItemsPopped);
- finally
- AQueueSize := fQueueSize;
- fQueueLock.Leave;
- end;
- end;
- {$ENDIF}
- function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- {$IFDEF FPC}
- function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- if FQueueSize >= High(FQueue) then Grow(10);
- Result := wrSignaled;
- //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
- //begin
- // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
- //end;
- if fShutDown or (Result <> wrSignaled) then Exit;
- //if fQueueSize = 0 then
- //begin
- // FQueueCondVar.SetEvent;
- //end;
- fQueue.Enqueue(AItem);
- Inc(FQueueSize);
- Inc(fTotalItemsPushed);
- finally
- AQueueSize := fQueueSize;
- FQueueLock.Leave;
- //FQueueCondVar.SetEvent;
- end;
- end;
- {$ELSE}
- function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
- //begin
- // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
- //end;
- if fShutDown or (Result <> wrSignaled) then Exit;
- if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
- fQueue.Enqueue(AItem);
- Inc(FQueueSize);
- Inc(fTotalItemsPushed);
- finally
- AQueueSize := fQueueSize;
- FQueueLock.Leave;
- end;
- end;
- {$ENDIF}
- procedure TThreadedQueueList<T>.DoShutDown;
- begin
- fShutDown := True;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- {$IFNDEF FPC}
- { TThreadObjectList<T> }
- procedure TThreadObjectList<T>.Add(const Item: T);
- begin
- LockList;
- try
- if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
- else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.Clear;
- begin
- LockList;
- try
- fList.Clear;
- finally
- UnlockList;
- end;
- end;
- constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
- begin
- inherited Create;
- fLock := TObject.Create;
- fList := TObjectList<T>.Create;
- fDuplicates := dupIgnore;
- end;
- destructor TThreadObjectList<T>.Destroy;
- begin
- LockList;
- try
- fList.Free;
- inherited Destroy;
- finally
- UnlockList;
- fLock.Free;
- end;
- end;
- function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
- begin
- LockList;
- try
- Result := fList[aIndex];
- finally
- UnlockList;
- end;
- end;
- function TThreadObjectList<T>.LockList: TObjectList<T>;
- begin
- System.TMonitor.Enter(fLock);
- Result := fList;
- end;
- procedure TThreadObjectList<T>.Remove(const Item: T);
- begin
- RemoveItem(Item, TDirection.FromBeginning);
- end;
- procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
- begin
- LockList;
- try
- fList.RemoveItem(Item, Direction);
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
- begin
- LockList;
- try
- fList[aIndex] := aValue;
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.UnlockList;
- begin
- System.TMonitor.Exit(fLock);
- end;
- {$ENDIF}
- { TAnonymousThread }
- constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
- begin
- fThread := TAdvThread.Create(aProc,aSynchronize);
- end;
- class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
- begin
- Result := TAnonymousThread.Create(aProc,False);
- end;
- class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
- begin
- Result := TAnonymousThread.Create(aProc,True);
- end;
- function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
- begin
- Result := Self;
- fThread.OnException(aProc);
- end;
- function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
- begin
- Result := Self;
- fThread.OnTerminate(aProc,False);
- end;
- function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
- begin
- Result := Self;
- fThread.OnTerminate(aProc,True);
- end;
- procedure TAnonymousThread.Start;
- begin
- fThread.Start;
- end;
- { TTask }
- constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
- var
- i : Integer;
- begin
- fTaskStatus := TWorkTaskStatus.wtsPending;
- fCustomFaultPolicy := False;
- fNumRetries := 0;
- fExecuteWithSync := False;
- fTerminateWithSync := False;
- fExceptionWithSync := False;
- fFaultControl := TFaultControl.Create;
- fFaultControl.OnRetry := DoRetry;
- fOwnedParams := aOwnedParams;
- fParamList := TParamList.Create(True);
- for i := Low(aParamArray) to High(aParamArray) do
- begin
- fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
- {$IFDEF FPC}
- fParamList[i].Value._AddRef;
- {$ENDIF}
- end;
- fExecuteProc := aTaskProc;
- fEnabled := False;
- end;
- destructor TTask.Destroy;
- var
- i : Integer;
- begin
- fFaultControl.Free;
- //free passed params
- fParamList.Free;
- if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
- inherited;
- end;
- procedure TTask.DoException(aException : Exception);
- begin
- fTaskStatus := TWorkTaskStatus.wtsException;
- if Assigned(fExceptProc) then fExceptProc(Self,aException)
- else raise aException;
- end;
- procedure TTask.DoExecute;
- begin
- fTaskStatus := TWorkTaskStatus.wtsRunning;
- DoInitialize;
- repeat
- try
- if Assigned(fExecuteProc) then fExecuteProc(Self);
- fTaskStatus := TWorkTaskStatus.wtsDone;
- fFaultControl.SuccessExecution;
- except
- on E : Exception do
- begin
- fTaskStatus := TWorkTaskStatus.wtsException;
- {$IFNDEF FPC}
- fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
- {$ELSE}
- fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
- {$ENDIF}
- end;
- end;
- until not fFaultControl.NeedToRetry;
- end;
- procedure TTask.DoInitialize;
- begin
- try
- fFaultControl.Reset;
- if Assigned(fInitializeProc) then fInitializeProc(Self);
- except
- on E : Exception do
- begin
- raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
- end;
- end;
- end;
- function TTask.Done: Boolean;
- begin
- Result := not fFaultControl.TaskFailed;
- end;
- function TTask.Failed: Boolean;
- begin
- Result := fFaultControl.TaskFailed;
- end;
- function TTask.CircuitBreaked: Boolean;
- begin
- Result := fFaultControl.CircuitBreaked;
- end;
- function TTask.LastException: Exception;
- begin
- Result := fFaultControl.LastException;
- end;
- function TTask.MaxRetries: Integer;
- begin
- Result := fFaultControl.MaxRetries;
- end;
- function TTask.NumRetries: Integer;
- begin
- Result := fFaultControl.NumRetries;
- end;
- procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
- begin
- vStopRetries := False;
- if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
- end;
- procedure TTask.DoTerminate;
- begin
- if Assigned(fTerminateProc) then fTerminateProc(Self);
- end;
- function TTask.GetIdTask: Int64;
- begin
- Result := fIdTask;
- end;
- procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
- begin
- {$IFDEF FPC}
- if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
- {$ENDIF}
- fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
- fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
- fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
- end;
- procedure TTask.SetIdTask(Value : Int64);
- begin
- fIdTask := Value;
- end;
- function TTask.GetNumWorker: Integer;
- begin
- Result := fNumWorker;
- end;
- function TTask.GetParam(aIndex: Integer): TFlexValue;
- begin
- Result := fParamList[aIndex].Value;
- end;
- function TTask.GetParam(const aName: string): TFlexValue;
- var
- paramvalue : TParamValue;
- begin
- for paramvalue in fParamList do
- begin
- if CompareText(paramvalue.Name,aName) = 0 then
- begin
- Exit(paramvalue.Value)
- end;
- end;
- //if not exists
- raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
- end;
- function TTask.GetParam2(aIndex: Integer): PFlexValue;
- begin
- Result := @fParamList[aIndex].Value;
- end;
- function TTask.GetResult: TFlexValue;
- begin
- Result := fResult;
- end;
- function TTask.IsEnabled: Boolean;
- begin
- Result := fEnabled;
- end;
- procedure TTask.SetNumWorker(Value: Integer);
- begin
- fTaskStatus := TWorkTaskStatus.wtsAssigned;
- fNumWorker := Value;
- end;
- procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
- begin
- if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
- fParamList[aIndex].Value := Value;
- end;
- procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
- var
- paramvalue : TParamValue;
- begin
- //check if already exists parameter
- for paramvalue in fParamList do
- begin
- if CompareText(paramvalue.Name,aName) = 0 then
- begin
- paramvalue.Value := Value;
- Exit;
- end;
- end;
- //if not exists, create one
- fParamList.Add(TParamValue.Create(aName,Value,aOwned));
- end;
- procedure TTask.SetParam(const aName: string; Value: TFlexValue);
- begin
- SetParam(aName,Value,False);
- end;
- procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
- begin
- fFaultControl.MaxRetries := aMaxRetries;
- fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
- fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
- fCustomFaultPolicy := True;
- end;
- procedure TTask.SetResult(aValue: TFlexValue);
- begin
- fResult := aValue;
- end;
- procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
- begin
- fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
- fFaultControl.WaitTimeBetweenRetriesMS := 0;
- fFaultControl.WaitTimeMultiplierFactor := 0;
- fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
- fCustomFaultPolicy := True;
- end;
- function TTask.TaskStatus: TWorkTaskStatus;
- begin
- Result := fTaskStatus;
- end;
- { TWorkTask }
- function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
- begin
- fExceptProc := aTaskProc;
- Result := Self;
- end;
- function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
- begin
- fInitializeProc := aTaskProc;
- Result := Self;
- end;
- function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
- begin
- fRetryProc := aTaskProc;
- Result := Self;
- end;
- function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
- begin
- fTerminateProc := aTaskProc;
- Result := Self;
- end;
- procedure TWorkTask.Run;
- begin
- fEnabled := True;
- end;
- function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
- begin
- Result := Self;
- SetParam(aName,aValue);
- end;
- function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
- begin
- Result := Self;
- SetParam(aName,aValue,aOwned);
- end;
- function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,0,0);
- end;
- function TWorkTask.RetryForever: IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,0,0);
- end;
- function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
- end;
- function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
- end;
- function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(aWaitTimeArray);
- end;
- function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
- end;
- function TWorkTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
- end;
- { TBackgroundTasks }
- procedure TBackgroundTasks.CancelAll;
- begin
- fTaskQueue.Clear;
- end;
- constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
- begin
- fMaxQueue := 0;
- fConcurrentWorkers := aConcurrentWorkers;
- fInsertTimeout := INFINITE;
- fExtractTimeout := 5000;
- fNumPushedTasks := 0;
- fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
- end;
- destructor TBackgroundTasks.Destroy;
- begin
- CancelAll;
- fTaskQueue.DoShutDown;
- //while fTaskQueue.QueueSize > 0 do Sleep(0);
- if Assigned(fWorkerPool) then fWorkerPool.Free;
- if Assigned(fTaskQueue) then fTaskQueue.Free;
- inherited;
- end;
- function TBackgroundTasks.GetTaskQueue: Cardinal;
- begin
- Result := fTaskQueue.QueueSize;
- end;
- function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
- begin
- Result := AddTask([],False,aTaskProc);
- end;
- function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
- var
- worktask : IWorkTask;
- begin
- if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
- worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
- Inc(fNumPushedTasks);
- worktask.SetIdTask(fNumPushedTasks);
- if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
- begin
- Result := worktask;
- end;
- end;
- function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
- begin
- Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
- TTask(Result).ExecuteWithSync := True;
- end;
- function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
- begin
- Result := AddTask_Sync([],False,aTaskProc);
- end;
- procedure TBackgroundTasks.Start;
- var
- i : Integer;
- worker : TWorker;
- begin
- //create workers
- if fWorkerPool <> nil then fWorkerPool.Free;
- fWorkerPool := TWorkerPool.Create(True);
- for i := 1 to fConcurrentWorkers do
- begin
- worker := TQueueWorker.Create(i,fTaskQueue);
- fWorkerPool.Add(worker);
- worker.Start;
- end;
- end;
- { TWorker }
- constructor TWorker.Create;
- begin
- inherited Create(True);
- fDefaultFaultPolicy := TFaultPolicy.Create;
- fStatus := TWorkerStatus.wsSuspended;
- FreeOnTerminate := False;
- end;
- destructor TWorker.Destroy;
- begin
- if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
- inherited;
- end;
- procedure TWorker.SetFaultPolicy(aTask: TTask);
- begin
- if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
- end;
- procedure TWorker.Execute;
- begin
- end;
- procedure TWorker.ExecuteTask;
- begin
- fCurrentTask.DoExecute;
- end;
- procedure TWorker.TerminateTask;
- begin
- fCurrentTask.DoTerminate;
- end;
- { TSimpleWorker }
- constructor TSimpleWorker.Create(aTask : ITask);
- begin
- inherited Create;
- fCurrentTask := aTask;
- FreeOnTerminate := True;
- end;
- procedure TSimpleWorker.Execute;
- begin
- fStatus := TWorkerStatus.wsIdle;
- while not Terminated do
- begin
- if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
- try
- fStatus := TWorkerStatus.wsWorking;
- try
- if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
- else fCurrentTask.DoExecute;
- except
- on E : Exception do
- begin
- if fCurrentTask <> nil then fCurrentTask.DoException(E)
- else raise ETaskExecutionError.Create(e.Message);
- end;
- end;
- finally
- if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
- else fCurrentTask.DoTerminate;
- fStatus := TWorkerStatus.wsIdle;
- Terminate;
- end;
- end;
- fStatus := TWorkerStatus.wsSuspended
- end;
- { TQueueWorker }
- constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
- begin
- inherited Create;
- fNumWorker := aNumWorker;
- fTaskQueue := aTaskQueue;
- end;
- procedure TQueueWorker.Execute;
- begin
- fStatus := TWorkerStatus.wsIdle;
- while not Terminated do
- begin
- fCurrentTask := fTaskQueue.PopItem;
- if fCurrentTask <> nil then
- try
- fStatus := TWorkerStatus.wsWorking;
- try
- fCurrentIdTask := fCurrentTask.GetIdTask;
- SetFaultPolicy(TTask(fCurrentTask));
- if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
- else fCurrentTask.DoExecute;
- except
- on E : Exception do
- begin
- if fCurrentTask <> nil then fCurrentTask.DoException(E)
- else raise ETaskExecutionError.Create(e.Message);
- end;
- end;
- finally
- if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
- else fCurrentTask.DoTerminate;
- fStatus := TWorkerStatus.wsIdle;
- end;
- end;
- fStatus := TWorkerStatus.wsSuspended
- end;
- { TScheduledWorker }
- constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
- begin
- inherited Create;
- {$IFNDEF DELPHILINUX}
- NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
- {$ENDIF}
- FreeOnTerminate := True;
- fCurrentTask := aScheduledTask;
- end;
- procedure TScheduledWorker.Execute;
- begin
- fStatus := TWorkerStatus.wsIdle;
- if Assigned(fCurrentTask) then
- begin
- try
- fStatus := TWorkerStatus.wsWorking;
- try
- SetFaultPolicy(TTask(fCurrentTask));
- if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
- else fCurrentTask.DoExecute;
- fStatus := TWorkerStatus.wsIdle;
- except
- on E : Exception do
- begin
- if fCurrentTask <> nil then fCurrentTask.DoException(E)
- else raise ETaskExecutionError.Create(e.Message);
- end;
- end;
- finally
- if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
- else fCurrentTask.DoTerminate;
- //check if expired
- if (fCurrentTask as IScheduledTask).IsFinished then
- begin
- if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
- else (fCurrentTask as IScheduledTask).DoExpire;
- end;
- end;
- end;
- fCurrentTask := nil;
- fStatus := TWorkerStatus.wsSuspended;
- end;
- procedure TScheduledWorker.ExpireTask;
- begin
- (fCurrentTask as IScheduledTask).DoExpire;
- end;
- { TScheduledTasks }
- function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
- begin
- Result := AddTask(aTaskName,[],False,aTaskProc);
- end;
- function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
- var
- scheduletask : TScheduledTask;
- begin
- scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
- scheduletask.Name := aTaskName;
- Inc(fNumPushedTasks);
- scheduletask.SetIdTask(fNumPushedTasks);
- fTaskList.Add(scheduletask);
- Result := scheduletask;
- end;
- function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
- begin
- Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
- TTask(Result).ExecuteWithSync := True;
- end;
- function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
- begin
- Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
- end;
- constructor TScheduledTasks.Create;
- begin
- fNumPushedTasks := 0;
- fIsStarted := False;
- fFaultPolicy := TFaultPolicy.Create;
- fTaskList := TScheduledTaskList.Create;
- end;
- destructor TScheduledTasks.Destroy;
- begin
- if Assigned(fScheduler) then
- begin
- fScheduler.Terminate;
- fScheduler.WaitFor;
- fScheduler.Free;
- end;
- if Assigned(fTaskList) then fTaskList.Free;
- if Assigned(fFaultPolicy) then fFaultPolicy.Free;
- inherited;
- end;
- function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
- begin
- Result := fScheduler.Get(aIdTask);
- end;
- function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
- begin
- if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
- Result := fScheduler.Get(aTaskName);
- end;
- procedure TScheduledTasks.Start;
- begin
- if fIsStarted then Exit;
- if not Assigned(fScheduler) then
- begin
- fScheduler := TScheduler.Create(fTaskList);
- fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
- end;
- fScheduler.Start;
- fIsStarted := True;
- end;
- procedure TScheduledTasks.Stop;
- begin
- if Assigned(fScheduler) then fScheduler.Terminate;
- fIsStarted := False;
- end;
- { TScheduledTask }
- function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
- begin
- Result := Self;
- SetParam(aName,aValue);
- end;
- function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
- begin
- Result := Self;
- SetParam(aName,aValue);
- end;
- function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := aStartDate;
- fNextExecution := aStartDate;
- end;
- function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := IncMinute(Now(),aMinutes);
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := IncSecond(Now(),aSeconds);
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.StartNow: IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := Now();
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.StartOnDayChange: IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
- fNextExecution := fStartDate;
- end;
- function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,0,0);
- end;
- function TScheduledTask.RetryForever: IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,0,0);
- end;
- function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
- end;
- function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
- end;
- function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(aWaitTimeArray);
- end;
- function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
- end;
- function TScheduledTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
- begin
- Result := Self;
- SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
- end;
- procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate < Now() then fStartDate := Now();
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate < Now() then fStartDate := Now();
- fExpirationDate := aEndTime;
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RepeatEveryDay;
- begin
- RepeatEvery(1,tmDays);
- end;
- procedure TScheduledTask.RepeatEveryWeek;
- begin
- RepeatEvery(7,tmDays);
- end;
- procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate < Now() then fStartDate := Now();
- fExpirationTimes := aRepeatTimes;
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RunOnce;
- begin
- fScheduleMode := TScheduleMode.smRunOnce;
- if fStartDate < Now() then fStartDate := Now();
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.Cancel;
- begin
- fFinished := True;
- end;
- function TScheduledTask.CheckSchedule: Boolean;
- begin
- Result := False;
- if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
- if fScheduleMode = TScheduleMode.smRunOnce then
- begin
- //if start date reached
- if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
- begin
- fLastExecution := Now();
- Inc(fExecutionTimes);
- fFinished := True;
- Result := True;
- end;
- end
- else
- begin
- //if next execution reached
- if Now() >= fNextExecution then
- begin
- //check expiration limits
- if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
- ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
- begin
- fFinished := True;
- Exit;
- end;
- //calculate next execution
- case fTimeMeasure of
- tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
- tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
- tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
- tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
- tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
- end;
- if Now() > fNextExecution then Result := False //avoid execution if system time was altered
- else
- begin
- fLastExecution := Now();
- Inc(fExecutionTimes);
- Result := True;
- end;
- end;
- end;
- end;
- procedure TScheduledTask.ClearSchedule;
- begin
- inherited;
- fFinished := False;
- fStartDate := 0.0;
- fLastExecution := 0.0;
- fNextExecution := 0.0;
- fExpirationDate := 0.0;
- fScheduleMode := TScheduleMode.smRunOnce;
- fTimeMeasure := TTimeMeasure.tmSeconds;
- fTimeInterval := 0;
- end;
- procedure TScheduledTask.DoExpire;
- begin
- if Assigned(fExpiredProc) then fExpiredProc(Self);
- fEnabled := False;
- end;
- function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
- begin
- Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
- end;
- function TScheduledTask.GetTaskName: string;
- begin
- Result := fName;
- end;
- function TScheduledTask.IsFinished: Boolean;
- begin
- Result := fFinished;
- end;
- function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
- begin
- fExceptProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
- begin
- Result := OnException(aTaskProc);
- TTask(Result).ExceptionWithSync := True;
- end;
- function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
- begin
- fRetryProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
- begin
- fExpiredProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
- begin
- Result := OnExpired(aTaskProc);
- TScheduledTask(Result).ExpireWithSync := True;
- end;
- function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
- begin
- fInitializeProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
- begin
- fTerminateProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
- begin
- Result := OnTerminated(aTaskProc);
- TTask(Result).TerminateWithSync := True;
- end;
- { TScheduler }
- constructor TScheduler.Create(aTaskList : TScheduledTaskList);
- begin
- inherited Create(True);
- FreeOnTerminate := False;
- fListLock := TCriticalSection.Create;
- fRemoveTaskAfterExpiration := False;
- fTaskList := aTaskList;
- {$IFDEF FPC}
- fCondVar := TSimpleEvent.Create;
- {$ELSE}
- fCondVar := TSimpleEvent.Create(nil,True,False,'');
- {$ENDIF}
- end;
- destructor TScheduler.Destroy;
- begin
- fCondVar.SetEvent;
- fCondVar.Free;
- fTaskList := nil;
- fListLock.Free;
- inherited;
- end;
- procedure TScheduler.Execute;
- var
- task : IScheduledTask;
- worker : TScheduledWorker;
- numworker : Int64;
- begin
- numworker := 0;
- while not Terminated do
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if (task.IsEnabled) and (not task.IsFinished) then
- begin
- if task.CheckSchedule then
- begin
- Inc(numworker);
- worker := TScheduledWorker.Create(numworker,task);
- worker.Start;
- end;
- end
- else
- begin
- if task.IsEnabled then
- begin
- //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
- // else task.DoExpire;
- if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
- end;
- end;
- end;
- task := nil;
- finally
- fListLock.Leave;
- end;
- fCondVar.WaitFor(250);
- end;
- end;
- function TScheduler.Add(aTask: TScheduledTask): Integer;
- begin
- Result := fTaskList.Add(aTask);
- end;
- function TScheduler.Get(aIdTask: Int64): IScheduledTask;
- var
- task : IScheduledTask;
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if task.IdTask = aIdTask then Exit(task);
- end;
- finally
- fListLock.Leave;
- end;
- end;
- function TScheduler.Get(const aTaskName: string): IScheduledTask;
- var
- task : IScheduledTask;
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if CompareText(task.Name,aTaskName) = 0 then Exit(task);
- end;
- finally
- fListLock.Leave;
- end;
- end;
- { TAdvThread }
- constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
- begin
- inherited Create(True);
- FreeOnTerminate := True;
- fExecuteWithSync := aSynchronize;
- fExecuteProc := aProc;
- end;
- procedure TAdvThread.DoExecute;
- begin
- try
- if Assigned(fExecuteProc) then fExecuteProc;
- except
- on E : Exception do
- begin
- {$IFNDEF FPC}
- if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
- {$ELSE}
- if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
- {$ENDIF}
- else raise e;
- end;
- end;
- end;
- procedure TAdvThread.CallToTerminate;
- begin
- if Assigned(fTerminateProc) then fTerminateProc;
- end;
- procedure TAdvThread.DoTerminate;
- begin
- if fTerminateWithSync then Synchronize(CallToTerminate)
- else CallToTerminate;
- end;
- procedure TAdvThread.Execute;
- begin
- if fExecuteWithSync then Synchronize(DoExecute)
- else DoExecute;
- end;
- procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
- begin
- fExceptionProc := aProc;
- end;
- procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
- begin
- fTerminateWithSync := aSynchronize;
- fTerminateProc := aProc;
- end;
- { TRunTask }
- class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
- begin
- Result := Execute([],False,aTaskProc);
- end;
- class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
- begin
- Result := Execute_Sync([],False,aTaskProc);
- end;
- class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
- var
- task : TWorkTask;
- worker : TSimpleWorker;
- begin
- task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
- task.ExecuteWithSync := False;
- Result := task;
- worker := TSimpleWorker.Create(task);
- worker.Start;
- end;
- class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
- var
- task : TWorkTask;
- worker : TSimpleWorker;
- begin
- task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
- task.ExecuteWithSync := True;
- Result := task;
- worker := TSimpleWorker.Create(task);
- worker.Start;
- end;
- { TParamValue }
- constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
- begin
- inherited Create;
- fName := aName;
- fValue := aValue;
- fOwned := aOwnedValue;
- end;
- constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
- begin
- inherited Create;
- fName := aName;
- fValue := aValue;
- fOwned := aOwnedValue;
- end;
- constructor TParamValue.Create;
- begin
- fName := '';
- fOwned := False;
- end;
- destructor TParamValue.Destroy;
- begin
- {$IFDEF FPC}
- fValue._Release;
- {$ENDIF}
- if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
- inherited;
- end;
- end.
|