Quick.Threads.pas 64 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267
  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 22/08/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskInitializationError = class(Exception);
  187. ETaskExecutionError = class(Exception);
  188. ETaskParamError = class(Exception);
  189. ETaskSchedulerError = class(Exception);
  190. ITask = interface
  191. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  192. function GetParam(aIndex : Integer) : TFlexValue; overload;
  193. function GetParam(const aName : string) : TFlexValue; overload;
  194. function GetParam2(aIndex : Integer) : PFlexValue;
  195. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  196. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  197. function TaskStatus : TWorkTaskStatus;
  198. function GetNumWorker : Integer;
  199. procedure SetNumWorker(Value : Integer);
  200. function GetIdTask : Int64;
  201. procedure SetIdTask(Value : Int64);
  202. function GetResult : TFlexValue;
  203. procedure SetResult(aValue : TFlexValue);
  204. procedure DoExecute;
  205. procedure DoException(aException : Exception);
  206. procedure DoTerminate;
  207. {$IFNDEF FPC}
  208. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  209. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  210. {$ELSE}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  212. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ENDIF}
  214. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  215. property Result : TFlexValue read GetResult write SetResult;
  216. property IdTask : Int64 read GetIdTask;
  217. function Done : Boolean;
  218. function Failed : Boolean;
  219. function NumRetries : Integer;
  220. function MaxRetries : Integer;
  221. function LastException : Exception;
  222. function CircuitBreaked : Boolean;
  223. function IsEnabled : Boolean;
  224. end;
  225. {$IFNDEF FPC}
  226. TTaskProc = reference to procedure(task : ITask);
  227. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  228. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  229. {$ELSE}
  230. TTaskProc = procedure(task : ITask) of object;
  231. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  232. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  233. {$ENDIF}
  234. IWorkTask = interface(ITask)
  235. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  236. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  237. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  238. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  239. function Retry(aMaxRetries : Integer) : IWorkTask;
  240. function RetryForever : IWorkTask;
  241. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  242. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  243. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  244. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  245. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  246. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  247. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  248. procedure Run;
  249. end;
  250. IScheduledTask = interface(ITask)
  251. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  252. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  253. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  254. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  255. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  256. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  257. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  259. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  260. function Retry(aMaxRetries : Integer) : IScheduledTask;
  261. function RetryForever : IScheduledTask;
  262. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  263. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  264. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  265. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  266. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  267. function CheckSchedule : Boolean;
  268. procedure DoExpire;
  269. function GetTaskName : string;
  270. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  271. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  272. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  273. function StartOnDayChange : IScheduledTask;
  274. function StartNow : IScheduledTask;
  275. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  276. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  277. procedure RunOnce;
  278. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  279. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  280. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  281. procedure RepeatEveryDay;
  282. procedure RepeatEveryWeek;
  283. function IsFinished : Boolean;
  284. procedure Cancel;
  285. property Name : string read GetTaskName;
  286. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  287. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  288. end;
  289. TTask = class(TInterfacedObject,ITask)
  290. private
  291. fIdTask : Int64;
  292. fNumWorker : Integer;
  293. fNumRetries : Integer;
  294. fParamList : TParamList;
  295. fInitializeProc : TTaskProc;
  296. fExecuteProc : TTaskProc;
  297. fExceptProc : TTaskExceptionProc;
  298. fTerminateProc : TTaskProc;
  299. fExpiredProc : TTaskProc;
  300. fTaskStatus : TWorkTaskStatus;
  301. fOwnedParams : Boolean;
  302. fEnabled : Boolean;
  303. fExecuteWithSync : Boolean;
  304. fExceptionWithSync : Boolean;
  305. fRetryProc : TTaskRetryProc;
  306. fTerminateWithSync : Boolean;
  307. fFaultControl : TFaultControl;
  308. fCustomFaultPolicy : Boolean;
  309. fResult : TFlexValue;
  310. function GetParam(aIndex : Integer) : TFlexValue; overload;
  311. function GetParam(const aName : string) : TFlexValue; overload;
  312. function GetParam2(aIndex : Integer) : PFlexValue;
  313. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  314. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  315. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  316. procedure DoInitialize;
  317. procedure DoExecute;
  318. procedure DoException(aException : Exception);
  319. procedure DoTerminate;
  320. function GetNumWorker : Integer;
  321. procedure SetNumWorker(Value : Integer);
  322. function GetIdTask : Int64;
  323. procedure SetIdTask(Value : Int64);
  324. function GetResult : TFlexValue;
  325. procedure SetResult(aValue : TFlexValue);
  326. protected
  327. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  328. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  329. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  330. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  331. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  332. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  333. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  334. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  335. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  336. public
  337. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  338. destructor Destroy; override;
  339. property IdTask : Int64 read GetIdTask;
  340. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  341. function IsEnabled : Boolean;
  342. function TaskStatus : TWorkTaskStatus;
  343. function Done : Boolean;
  344. function Failed : Boolean;
  345. function NumRetries : Integer;
  346. function MaxRetries : Integer;
  347. function LastException : Exception;
  348. function CircuitBreaked : Boolean;
  349. end;
  350. TWorkTask = class(TTask,IWorkTask)
  351. public
  352. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  353. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  354. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  355. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  356. function Retry(aMaxRetries : Integer) : IWorkTask;
  357. function RetryForever : IWorkTask;
  358. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  359. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  360. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  361. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  362. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  363. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  364. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  365. procedure Run; virtual;
  366. end;
  367. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  368. TScheduledTask = class(TTask,IScheduledTask)
  369. private
  370. fName : string;
  371. fExecutionTimes : Integer;
  372. fScheduleMode : TScheduleMode;
  373. fTimeInterval : Integer;
  374. fTimeMeasure : TTimeMeasure;
  375. fStartDate : TDateTime;
  376. fLastExecution : TDateTime;
  377. fNextExecution : TDateTime;
  378. fExpirationDate : TDateTime;
  379. fExpirationTimes : Integer;
  380. fFinished : Boolean;
  381. fExpireWithSync: Boolean;
  382. procedure ClearSchedule;
  383. function CheckSchedule : Boolean;
  384. procedure DoExpire;
  385. function GetTaskName : string;
  386. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  387. protected
  388. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  389. public
  390. property Name : string read fName write fName;
  391. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  392. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  393. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  394. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  395. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  396. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  397. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  398. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  399. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  400. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  401. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  402. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  403. function StartOnDayChange : IScheduledTask;
  404. function StartNow : IScheduledTask;
  405. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  406. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  407. procedure RunOnce;
  408. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  409. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  410. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  411. procedure RepeatEveryDay;
  412. procedure RepeatEveryWeek;
  413. function Retry(aMaxRetries : Integer) : IScheduledTask;
  414. function RetryForever : IScheduledTask;
  415. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  416. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  417. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  418. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  419. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  420. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  421. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  422. function IsFinished : Boolean;
  423. procedure Cancel;
  424. end;
  425. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  426. TWorker = class(TThread)
  427. protected
  428. fStatus : TWorkerStatus;
  429. fCurrentTask : ITask;
  430. fDefaultFaultPolicy : TFaultPolicy;
  431. procedure ExecuteTask;
  432. procedure TerminateTask;
  433. public
  434. constructor Create;
  435. destructor Destroy; override;
  436. property Status : TWorkerStatus read fStatus;
  437. procedure SetFaultPolicy(aTask : TTask);
  438. procedure Execute; override;
  439. end;
  440. TSimpleWorker = class(TWorker)
  441. public
  442. constructor Create(aTask : ITask);
  443. procedure Execute; override;
  444. end;
  445. TQueueWorker = class(TWorker)
  446. private
  447. fCurrentIdTask : Integer;
  448. fNumWorker : Integer;
  449. fTaskQueue : TTaskQueue;
  450. public
  451. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  452. property NumWorker : Integer read fNumWorker;
  453. procedure Execute; override;
  454. end;
  455. TScheduledWorker = class(TWorker)
  456. private
  457. procedure ExpireTask;
  458. public
  459. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  460. procedure Execute; override;
  461. end;
  462. TWorkerPool = TObjectList<TWorker>;
  463. TRunTask = class
  464. public
  465. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  466. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  467. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  468. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  469. end;
  470. TBackgroundTasks = class
  471. private
  472. fMaxQueue : Integer;
  473. fWorkerPool : TWorkerPool;
  474. fConcurrentWorkers : Integer;
  475. fInsertTimeout : Cardinal;
  476. fExtractTimeout : Cardinal;
  477. fTaskQueue : TTaskQueue;
  478. fNumPushedTasks : Int64;
  479. function GetTaskQueue : Cardinal;
  480. public
  481. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  482. destructor Destroy; override;
  483. property MaxQueue : Integer read fMaxQueue;
  484. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  485. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  486. property TaskQueued : Cardinal read GetTaskQueue;
  487. property NumPushedTasks : Int64 read fNumPushedTasks;
  488. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  489. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  490. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  491. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  492. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  493. procedure Start;
  494. procedure CancelAll;
  495. end;
  496. TScheduledTaskList = TList<IScheduledTask>;
  497. TScheduler = class(TThread)
  498. private
  499. fListLock : TCriticalSection;
  500. fCondVar : TSimpleEvent;
  501. fTaskList : TScheduledTaskList;
  502. fRemoveTaskAfterExpiration : Boolean;
  503. public
  504. constructor Create(aTaskList : TScheduledTaskList);
  505. destructor Destroy; override;
  506. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  507. procedure Execute; override;
  508. function Add(aTask : TScheduledTask) : Integer;
  509. function Get(aIdTask : Int64) : IScheduledTask; overload;
  510. function Get(const aTaskName : string) : IScheduledTask; overload;
  511. end;
  512. TScheduledTasks = class
  513. private
  514. fTaskList : TScheduledTaskList;
  515. fScheduler : TScheduler;
  516. fNumPushedTasks : Int64;
  517. fRemoveTaskAfterExpiration : Boolean;
  518. fIsStarted : Boolean;
  519. fFaultPolicy : TFaultPolicy;
  520. public
  521. constructor Create;
  522. destructor Destroy; override;
  523. property NumPushedTasks : Int64 read fNumPushedTasks;
  524. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  525. property IsStarted : Boolean read fIsStarted;
  526. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  527. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  528. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  529. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  530. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  531. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  532. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  533. procedure Start;
  534. procedure Stop;
  535. end;
  536. implementation
  537. { TThreadedQueueCS<T> }
  538. procedure TThreadedQueueCS<T>.Clear;
  539. var
  540. obj : T;
  541. begin
  542. FQueueLock.Enter;
  543. try
  544. for obj in FQueue do
  545. begin
  546. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  547. end;
  548. SetLength(FQueue,0);
  549. finally
  550. FQueueLock.Leave;
  551. end;
  552. {$IFDEF FPC}
  553. FQueueCondVar.SetEvent;
  554. {$ELSE}
  555. FQueueCondVar.ReleaseAll;
  556. {$ENDIF}
  557. end;
  558. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  559. begin
  560. inherited Create;
  561. SetLength(FQueue, AQueueDepth);
  562. FQueueLock := TCriticalSection.Create;
  563. {$IFDEF FPC}
  564. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  565. {$ELSE}
  566. FQueueCondVar := TConditionVariableCS.Create;
  567. {$ENDIF}
  568. FPushTimeout := PushTimeout;
  569. FPopTimeout := PopTimeout;
  570. end;
  571. destructor TThreadedQueueCS<T>.Destroy;
  572. begin
  573. DoShutDown;
  574. FQueueLock.Free;
  575. FQueueCondVar.Free;
  576. inherited;
  577. end;
  578. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  579. begin
  580. FQueueLock.Enter;
  581. try
  582. SetLength(FQueue, Length(FQueue) + ADelta);
  583. finally
  584. FQueueLock.Leave;
  585. end;
  586. {$IFDEF FPC}
  587. FQueueCondVar.SetEvent;
  588. {$ELSE}
  589. FQueueCondVar.ReleaseAll;
  590. {$ENDIF}
  591. end;
  592. function TThreadedQueueCS<T>.PopItem: T;
  593. var
  594. LQueueSize: Integer;
  595. begin
  596. PopItem(LQueueSize, Result);
  597. end;
  598. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  599. begin
  600. AItem := Default(T);
  601. FQueueLock.Enter;
  602. try
  603. Result := wrSignaled;
  604. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  605. begin
  606. {$IFDEF FPC}
  607. Result := FQueueCondVar.WaitFor(FPopTimeout);
  608. {$ELSE}
  609. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  610. {$ENDIF}
  611. end;
  612. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  613. AItem := FQueue[FQueueOffset];
  614. FQueue[FQueueOffset] := Default(T);
  615. if FQueueSize = Length(FQueue) then
  616. begin
  617. {$IFDEF FPC}
  618. FQueueCondVar.SetEvent;
  619. {$ELSE}
  620. FQueueCondVar.ReleaseAll;
  621. {$ENDIF}
  622. end;
  623. Dec(FQueueSize);
  624. Inc(FQueueOffset);
  625. Inc(FTotalItemsPopped);
  626. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  627. finally
  628. AQueueSize := FQueueSize;
  629. FQueueLock.Leave;
  630. end;
  631. end;
  632. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  633. var
  634. LQueueSize: Integer;
  635. begin
  636. Result := PopItem(LQueueSize, AItem);
  637. end;
  638. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  639. begin
  640. PopItem(AQueueSize, Result);
  641. end;
  642. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  643. var
  644. LQueueSize: Integer;
  645. begin
  646. Result := PushItem(AItem, LQueueSize);
  647. end;
  648. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  649. begin
  650. FQueueLock.Enter;
  651. try
  652. Result := wrSignaled;
  653. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  654. begin
  655. {$IFDEF FPC}
  656. Result := FQueueCondVar.WaitFor(FPushTimeout);
  657. {$ELSE}
  658. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  659. {$ENDIF}
  660. end;
  661. if FShutDown or (Result <> wrSignaled) then Exit;
  662. if FQueueSize = 0 then
  663. begin
  664. {$IFDEF FPC}
  665. FQueueCondVar.SetEvent;
  666. {$ELSE}
  667. FQueueCondVar.ReleaseAll;
  668. {$ENDIF}
  669. end;
  670. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  671. Inc(FQueueSize);
  672. Inc(FTotalItemsPushed);
  673. finally
  674. AQueueSize := FQueueSize;
  675. FQueueLock.Leave;
  676. end;
  677. end;
  678. procedure TThreadedQueueCS<T>.DoShutDown;
  679. begin
  680. FShutDown := True;
  681. {$IFDEF FPC}
  682. FQueueCondVar.SetEvent;
  683. {$ELSE}
  684. FQueueCondVar.ReleaseAll;
  685. {$ENDIF}
  686. end;
  687. { TThreadedQueueList<T> }
  688. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  689. begin
  690. inherited Create;
  691. fQueue := TQueue<T>.Create;
  692. fQueue.Capacity := AQueueDepth;
  693. fQueueSize := 0;
  694. fQueueLock := TCriticalSection.Create;
  695. {$IFDEF FPC}
  696. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  697. {$ELSE}
  698. fQueueCondVar := TConditionVariableCS.Create;
  699. {$ENDIF}
  700. fPushTimeout := PushTimeout;
  701. fPopTimeout := PopTimeout;
  702. end;
  703. destructor TThreadedQueueList<T>.Destroy;
  704. begin
  705. DoShutDown;
  706. fQueueLock.Free;
  707. fQueueCondVar.Free;
  708. fQueue.Free;
  709. inherited;
  710. end;
  711. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  712. begin
  713. fQueueLock.Enter;
  714. try
  715. fQueue.Capacity := fQueue.Capacity + ADelta;
  716. finally
  717. fQueueLock.Leave;
  718. end;
  719. {$IFDEF FPC}
  720. FQueueCondVar.SetEvent;
  721. {$ELSE}
  722. FQueueCondVar.ReleaseAll;
  723. {$ENDIF}
  724. end;
  725. function TThreadedQueueList<T>.PopItem: T;
  726. var
  727. LQueueSize: Integer;
  728. begin
  729. PopItem(LQueueSize, Result);
  730. end;
  731. {$IFDEF FPC}
  732. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  733. //var
  734. //crono : TChronometer;
  735. begin
  736. AItem := Default(T);
  737. //crono := TChronometer.Create(False);
  738. try
  739. Result := wrSignaled;
  740. //writeln('popitem');
  741. //crono.Start;
  742. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  743. begin
  744. //crono.Start;
  745. Result := FQueueCondVar.WaitFor(FPopTimeout);
  746. //crono.Stop;
  747. //writeln('in: ' + crono.ElapsedTime);
  748. //if result = twaitresult.wrError then result := twaitresult.wrError;
  749. end;
  750. //crono.Stop;
  751. //writeln('out: ' + crono.ElapsedTime);
  752. fQueueLock.Enter;
  753. try
  754. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  755. AItem := fQueue.Extract;
  756. Dec(FQueueSize);
  757. Inc(fTotalItemsPopped);
  758. finally
  759. fQueueLock.Leave;
  760. end;
  761. finally
  762. AQueueSize := fQueueSize;
  763. end;
  764. end;
  765. {$ELSE}
  766. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  767. begin
  768. AItem := Default(T);
  769. fQueueLock.Enter;
  770. try
  771. Result := wrSignaled;
  772. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  773. begin
  774. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  775. end;
  776. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  777. AItem := fQueue.Extract;
  778. if fQueueSize = fQueue.Count then
  779. begin
  780. FQueueCondVar.ReleaseAll;
  781. end;
  782. Dec(FQueueSize);
  783. Inc(fTotalItemsPopped);
  784. finally
  785. AQueueSize := fQueueSize;
  786. fQueueLock.Leave;
  787. end;
  788. end;
  789. {$ENDIF}
  790. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  791. var
  792. LQueueSize: Integer;
  793. begin
  794. Result := PopItem(LQueueSize, AItem);
  795. end;
  796. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  797. begin
  798. PopItem(AQueueSize, Result);
  799. end;
  800. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  801. var
  802. LQueueSize: Integer;
  803. begin
  804. Result := PushItem(AItem, LQueueSize);
  805. end;
  806. {$IFDEF FPC}
  807. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  808. begin
  809. FQueueLock.Enter;
  810. try
  811. Result := wrSignaled;
  812. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  813. //begin
  814. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  815. //end;
  816. if fShutDown or (Result <> wrSignaled) then Exit;
  817. //if fQueueSize = 0 then
  818. //begin
  819. // FQueueCondVar.SetEvent;
  820. //end;
  821. fQueue.Enqueue(AItem);
  822. Inc(FQueueSize);
  823. Inc(fTotalItemsPushed);
  824. finally
  825. AQueueSize := fQueueSize;
  826. FQueueLock.Leave;
  827. //FQueueCondVar.SetEvent;
  828. end;
  829. end;
  830. {$ELSE}
  831. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  832. begin
  833. FQueueLock.Enter;
  834. try
  835. Result := wrSignaled;
  836. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  837. //begin
  838. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  839. //end;
  840. if fShutDown or (Result <> wrSignaled) then Exit;
  841. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  842. fQueue.Enqueue(AItem);
  843. Inc(FQueueSize);
  844. Inc(fTotalItemsPushed);
  845. finally
  846. AQueueSize := fQueueSize;
  847. FQueueLock.Leave;
  848. end;
  849. end;
  850. {$ENDIF}
  851. procedure TThreadedQueueList<T>.DoShutDown;
  852. begin
  853. fShutDown := True;
  854. {$IFDEF FPC}
  855. FQueueCondVar.SetEvent;
  856. {$ELSE}
  857. FQueueCondVar.ReleaseAll;
  858. {$ENDIF}
  859. end;
  860. {$IFNDEF FPC}
  861. { TThreadObjectList<T> }
  862. procedure TThreadObjectList<T>.Add(const Item: T);
  863. begin
  864. LockList;
  865. try
  866. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  867. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  868. finally
  869. UnlockList;
  870. end;
  871. end;
  872. procedure TThreadObjectList<T>.Clear;
  873. begin
  874. LockList;
  875. try
  876. fList.Clear;
  877. finally
  878. UnlockList;
  879. end;
  880. end;
  881. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  882. begin
  883. inherited Create;
  884. fLock := TObject.Create;
  885. fList := TObjectList<T>.Create;
  886. fDuplicates := dupIgnore;
  887. end;
  888. destructor TThreadObjectList<T>.Destroy;
  889. begin
  890. LockList;
  891. try
  892. fList.Free;
  893. inherited Destroy;
  894. finally
  895. UnlockList;
  896. fLock.Free;
  897. end;
  898. end;
  899. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  900. begin
  901. LockList;
  902. try
  903. Result := fList[aIndex];
  904. finally
  905. UnlockList;
  906. end;
  907. end;
  908. function TThreadObjectList<T>.LockList: TObjectList<T>;
  909. begin
  910. System.TMonitor.Enter(fLock);
  911. Result := fList;
  912. end;
  913. procedure TThreadObjectList<T>.Remove(const Item: T);
  914. begin
  915. RemoveItem(Item, TDirection.FromBeginning);
  916. end;
  917. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  918. begin
  919. LockList;
  920. try
  921. fList.RemoveItem(Item, Direction);
  922. finally
  923. UnlockList;
  924. end;
  925. end;
  926. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  927. begin
  928. LockList;
  929. try
  930. fList[aIndex] := aValue;
  931. finally
  932. UnlockList;
  933. end;
  934. end;
  935. procedure TThreadObjectList<T>.UnlockList;
  936. begin
  937. System.TMonitor.Exit(fLock);
  938. end;
  939. {$ENDIF}
  940. { TAnonymousThread }
  941. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  942. begin
  943. fThread := TAdvThread.Create(aProc,aSynchronize);
  944. end;
  945. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  946. begin
  947. Result := TAnonymousThread.Create(aProc,False);
  948. end;
  949. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  950. begin
  951. Result := TAnonymousThread.Create(aProc,True);
  952. end;
  953. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  954. begin
  955. Result := Self;
  956. fThread.OnException(aProc);
  957. end;
  958. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  959. begin
  960. Result := Self;
  961. fThread.OnTerminate(aProc,False);
  962. end;
  963. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  964. begin
  965. Result := Self;
  966. fThread.OnTerminate(aProc,True);
  967. end;
  968. procedure TAnonymousThread.Start;
  969. begin
  970. fThread.Start;
  971. end;
  972. { TTask }
  973. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  974. var
  975. i : Integer;
  976. begin
  977. fTaskStatus := TWorkTaskStatus.wtsPending;
  978. fCustomFaultPolicy := False;
  979. fNumRetries := 0;
  980. fExecuteWithSync := False;
  981. fTerminateWithSync := False;
  982. fExceptionWithSync := False;
  983. fFaultControl := TFaultControl.Create;
  984. fFaultControl.OnRetry := DoRetry;
  985. fOwnedParams := aOwnedParams;
  986. fParamList := TParamList.Create(True);
  987. for i := Low(aParamArray) to High(aParamArray) do
  988. begin
  989. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  990. {$IFDEF FPC}
  991. fParamList[i].Value._AddRef;
  992. {$ENDIF}
  993. end;
  994. fExecuteProc := aTaskProc;
  995. fEnabled := False;
  996. end;
  997. destructor TTask.Destroy;
  998. var
  999. i : Integer;
  1000. begin
  1001. fFaultControl.Free;
  1002. //free passed params
  1003. fParamList.Free;
  1004. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1005. inherited;
  1006. end;
  1007. procedure TTask.DoException(aException : Exception);
  1008. begin
  1009. fTaskStatus := TWorkTaskStatus.wtsException;
  1010. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1011. else raise aException;
  1012. end;
  1013. procedure TTask.DoExecute;
  1014. begin
  1015. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1016. DoInitialize;
  1017. repeat
  1018. try
  1019. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1020. fTaskStatus := TWorkTaskStatus.wtsDone;
  1021. fFaultControl.SuccessExecution;
  1022. except
  1023. on E : Exception do
  1024. begin
  1025. fTaskStatus := TWorkTaskStatus.wtsException;
  1026. {$IFNDEF FPC}
  1027. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1028. {$ELSE}
  1029. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1030. {$ENDIF}
  1031. end;
  1032. end;
  1033. until not fFaultControl.NeedToRetry;
  1034. end;
  1035. procedure TTask.DoInitialize;
  1036. begin
  1037. try
  1038. fFaultControl.Reset;
  1039. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1040. except
  1041. on E : Exception do
  1042. begin
  1043. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1044. end;
  1045. end;
  1046. end;
  1047. function TTask.Done: Boolean;
  1048. begin
  1049. Result := not fFaultControl.TaskFailed;
  1050. end;
  1051. function TTask.Failed: Boolean;
  1052. begin
  1053. Result := fFaultControl.TaskFailed;
  1054. end;
  1055. function TTask.CircuitBreaked: Boolean;
  1056. begin
  1057. Result := fFaultControl.CircuitBreaked;
  1058. end;
  1059. function TTask.LastException: Exception;
  1060. begin
  1061. Result := fFaultControl.LastException;
  1062. end;
  1063. function TTask.MaxRetries: Integer;
  1064. begin
  1065. Result := fFaultControl.MaxRetries;
  1066. end;
  1067. function TTask.NumRetries: Integer;
  1068. begin
  1069. Result := fFaultControl.NumRetries;
  1070. end;
  1071. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1072. begin
  1073. vStopRetries := False;
  1074. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1075. end;
  1076. procedure TTask.DoTerminate;
  1077. begin
  1078. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1079. end;
  1080. function TTask.GetIdTask: Int64;
  1081. begin
  1082. Result := fIdTask;
  1083. end;
  1084. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1085. begin
  1086. {$IFDEF FPC}
  1087. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1088. {$ENDIF}
  1089. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1090. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1091. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1092. end;
  1093. procedure TTask.SetIdTask(Value : Int64);
  1094. begin
  1095. fIdTask := Value;
  1096. end;
  1097. function TTask.GetNumWorker: Integer;
  1098. begin
  1099. Result := fNumWorker;
  1100. end;
  1101. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1102. begin
  1103. Result := fParamList[aIndex].Value;
  1104. end;
  1105. function TTask.GetParam(const aName: string): TFlexValue;
  1106. var
  1107. paramvalue : TParamValue;
  1108. begin
  1109. for paramvalue in fParamList do
  1110. begin
  1111. if CompareText(paramvalue.Name,aName) = 0 then
  1112. begin
  1113. Exit(paramvalue.Value)
  1114. end;
  1115. end;
  1116. //if not exists
  1117. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1118. end;
  1119. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1120. begin
  1121. Result := @fParamList[aIndex].Value;
  1122. end;
  1123. function TTask.GetResult: TFlexValue;
  1124. begin
  1125. Result := fResult;
  1126. end;
  1127. function TTask.IsEnabled: Boolean;
  1128. begin
  1129. Result := fEnabled;
  1130. end;
  1131. procedure TTask.SetNumWorker(Value: Integer);
  1132. begin
  1133. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1134. fNumWorker := Value;
  1135. end;
  1136. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1137. begin
  1138. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1139. fParamList[aIndex].Value := Value;
  1140. end;
  1141. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1142. var
  1143. paramvalue : TParamValue;
  1144. begin
  1145. //check if already exists parameter
  1146. for paramvalue in fParamList do
  1147. begin
  1148. if CompareText(paramvalue.Name,aName) = 0 then
  1149. begin
  1150. paramvalue.Value := Value;
  1151. Exit;
  1152. end;
  1153. end;
  1154. //if not exists, create one
  1155. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1156. end;
  1157. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1158. begin
  1159. SetParam(aName,Value,False);
  1160. end;
  1161. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1162. begin
  1163. fFaultControl.MaxRetries := aMaxRetries;
  1164. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1165. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1166. fCustomFaultPolicy := True;
  1167. end;
  1168. procedure TTask.SetResult(aValue: TFlexValue);
  1169. begin
  1170. fResult := aValue;
  1171. end;
  1172. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1173. begin
  1174. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1175. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1176. fFaultControl.WaitTimeMultiplierFactor := 0;
  1177. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1178. fCustomFaultPolicy := True;
  1179. end;
  1180. function TTask.TaskStatus: TWorkTaskStatus;
  1181. begin
  1182. Result := fTaskStatus;
  1183. end;
  1184. { TWorkTask }
  1185. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1186. begin
  1187. fExceptProc := aTaskProc;
  1188. Result := Self;
  1189. end;
  1190. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1191. begin
  1192. fInitializeProc := aTaskProc;
  1193. Result := Self;
  1194. end;
  1195. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1196. begin
  1197. fRetryProc := aTaskProc;
  1198. Result := Self;
  1199. end;
  1200. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1201. begin
  1202. fTerminateProc := aTaskProc;
  1203. Result := Self;
  1204. end;
  1205. procedure TWorkTask.Run;
  1206. begin
  1207. fEnabled := True;
  1208. end;
  1209. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1210. begin
  1211. Result := Self;
  1212. SetParam(aName,aValue);
  1213. end;
  1214. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1215. begin
  1216. Result := Self;
  1217. SetParam(aName,aValue,aOwned);
  1218. end;
  1219. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1220. begin
  1221. Result := Self;
  1222. SetRetryPolicy(aMaxRetries,0,0);
  1223. end;
  1224. function TWorkTask.RetryForever: IWorkTask;
  1225. begin
  1226. Result := Self;
  1227. SetRetryPolicy(-1,0,0);
  1228. end;
  1229. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1230. begin
  1231. Result := Self;
  1232. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
  1233. end;
  1234. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1235. begin
  1236. Result := Self;
  1237. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1238. end;
  1239. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1240. begin
  1241. Result := Self;
  1242. SetRetryPolicy(aWaitTimeArray);
  1243. end;
  1244. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1245. begin
  1246. Result := Self;
  1247. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
  1248. end;
  1249. function TWorkTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1250. begin
  1251. Result := Self;
  1252. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1253. end;
  1254. { TBackgroundTasks }
  1255. procedure TBackgroundTasks.CancelAll;
  1256. begin
  1257. fTaskQueue.Clear;
  1258. end;
  1259. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  1260. begin
  1261. fMaxQueue := aMaxQueue;
  1262. fConcurrentWorkers := aConcurrentWorkers;
  1263. fInsertTimeout := INFINITE;
  1264. fExtractTimeout := 5000;
  1265. fNumPushedTasks := 0;
  1266. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  1267. end;
  1268. destructor TBackgroundTasks.Destroy;
  1269. begin
  1270. CancelAll;
  1271. fTaskQueue.DoShutDown;
  1272. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1273. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1274. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1275. inherited;
  1276. end;
  1277. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1278. begin
  1279. Result := fTaskQueue.QueueSize;
  1280. end;
  1281. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1282. begin
  1283. Result := AddTask([],False,aTaskProc);
  1284. end;
  1285. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1286. var
  1287. worktask : IWorkTask;
  1288. begin
  1289. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1290. Inc(fNumPushedTasks);
  1291. worktask.SetIdTask(fNumPushedTasks);
  1292. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1293. begin
  1294. Result := worktask;
  1295. end;
  1296. end;
  1297. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1298. begin
  1299. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1300. TTask(Result).ExecuteWithSync := True;
  1301. end;
  1302. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1303. begin
  1304. Result := AddTask_Sync([],False,aTaskProc);
  1305. end;
  1306. procedure TBackgroundTasks.Start;
  1307. var
  1308. i : Integer;
  1309. worker : TWorker;
  1310. begin
  1311. //create workers
  1312. if fWorkerPool <> nil then fWorkerPool.Free;
  1313. fWorkerPool := TWorkerPool.Create(True);
  1314. for i := 1 to fConcurrentWorkers do
  1315. begin
  1316. worker := TQueueWorker.Create(i,fTaskQueue);
  1317. fWorkerPool.Add(worker);
  1318. worker.Start;
  1319. end;
  1320. end;
  1321. { TWorker }
  1322. constructor TWorker.Create;
  1323. begin
  1324. inherited Create(True);
  1325. fDefaultFaultPolicy := TFaultPolicy.Create;
  1326. fStatus := TWorkerStatus.wsSuspended;
  1327. FreeOnTerminate := False;
  1328. end;
  1329. destructor TWorker.Destroy;
  1330. begin
  1331. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1332. inherited;
  1333. end;
  1334. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1335. begin
  1336. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1337. end;
  1338. procedure TWorker.Execute;
  1339. begin
  1340. end;
  1341. procedure TWorker.ExecuteTask;
  1342. begin
  1343. fCurrentTask.DoExecute;
  1344. end;
  1345. procedure TWorker.TerminateTask;
  1346. begin
  1347. fCurrentTask.DoTerminate;
  1348. end;
  1349. { TSimpleWorker }
  1350. constructor TSimpleWorker.Create(aTask : ITask);
  1351. begin
  1352. inherited Create;
  1353. fCurrentTask := aTask;
  1354. FreeOnTerminate := True;
  1355. end;
  1356. procedure TSimpleWorker.Execute;
  1357. begin
  1358. fStatus := TWorkerStatus.wsIdle;
  1359. while not Terminated do
  1360. begin
  1361. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1362. try
  1363. fStatus := TWorkerStatus.wsWorking;
  1364. try
  1365. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1366. else fCurrentTask.DoExecute;
  1367. except
  1368. on E : Exception do
  1369. begin
  1370. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1371. else raise ETaskExecutionError.Create(e.Message);
  1372. end;
  1373. end;
  1374. finally
  1375. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1376. else fCurrentTask.DoTerminate;
  1377. fStatus := TWorkerStatus.wsIdle;
  1378. Terminate;
  1379. end;
  1380. end;
  1381. fStatus := TWorkerStatus.wsSuspended
  1382. end;
  1383. { TQueueWorker }
  1384. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1385. begin
  1386. inherited Create;
  1387. fNumWorker := aNumWorker;
  1388. fTaskQueue := aTaskQueue;
  1389. end;
  1390. procedure TQueueWorker.Execute;
  1391. begin
  1392. fStatus := TWorkerStatus.wsIdle;
  1393. while not Terminated do
  1394. begin
  1395. fCurrentTask := fTaskQueue.PopItem;
  1396. if fCurrentTask <> nil then
  1397. try
  1398. fStatus := TWorkerStatus.wsWorking;
  1399. try
  1400. fCurrentIdTask := fCurrentTask.GetIdTask;
  1401. SetFaultPolicy(TTask(fCurrentTask));
  1402. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1403. else fCurrentTask.DoExecute;
  1404. except
  1405. on E : Exception do
  1406. begin
  1407. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1408. else raise ETaskExecutionError.Create(e.Message);
  1409. end;
  1410. end;
  1411. finally
  1412. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1413. else fCurrentTask.DoTerminate;
  1414. fStatus := TWorkerStatus.wsIdle;
  1415. end;
  1416. end;
  1417. fStatus := TWorkerStatus.wsSuspended
  1418. end;
  1419. { TScheduledWorker }
  1420. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1421. begin
  1422. inherited Create;
  1423. {$IFNDEF DELPHILINUX}
  1424. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1425. {$ENDIF}
  1426. FreeOnTerminate := True;
  1427. fCurrentTask := aScheduledTask;
  1428. end;
  1429. procedure TScheduledWorker.Execute;
  1430. begin
  1431. fStatus := TWorkerStatus.wsIdle;
  1432. if Assigned(fCurrentTask) then
  1433. begin
  1434. try
  1435. fStatus := TWorkerStatus.wsWorking;
  1436. try
  1437. SetFaultPolicy(TTask(fCurrentTask));
  1438. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1439. else fCurrentTask.DoExecute;
  1440. fStatus := TWorkerStatus.wsIdle;
  1441. except
  1442. on E : Exception do
  1443. begin
  1444. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1445. else raise ETaskExecutionError.Create(e.Message);
  1446. end;
  1447. end;
  1448. finally
  1449. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1450. else fCurrentTask.DoTerminate;
  1451. //check if expired
  1452. if (fCurrentTask as IScheduledTask).IsFinished then
  1453. begin
  1454. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1455. else (fCurrentTask as IScheduledTask).DoExpire;
  1456. end;
  1457. end;
  1458. end;
  1459. fCurrentTask := nil;
  1460. fStatus := TWorkerStatus.wsSuspended;
  1461. end;
  1462. procedure TScheduledWorker.ExpireTask;
  1463. begin
  1464. (fCurrentTask as IScheduledTask).DoExpire;
  1465. end;
  1466. { TScheduledTasks }
  1467. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1468. begin
  1469. Result := AddTask(aTaskName,[],False,aTaskProc);
  1470. end;
  1471. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1472. var
  1473. scheduletask : TScheduledTask;
  1474. begin
  1475. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1476. scheduletask.Name := aTaskName;
  1477. Inc(fNumPushedTasks);
  1478. scheduletask.SetIdTask(fNumPushedTasks);
  1479. fTaskList.Add(scheduletask);
  1480. Result := scheduletask;
  1481. end;
  1482. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1483. begin
  1484. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1485. TTask(Result).ExecuteWithSync := True;
  1486. end;
  1487. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1488. begin
  1489. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1490. end;
  1491. constructor TScheduledTasks.Create;
  1492. begin
  1493. fNumPushedTasks := 0;
  1494. fIsStarted := False;
  1495. fFaultPolicy := TFaultPolicy.Create;
  1496. fTaskList := TScheduledTaskList.Create;
  1497. end;
  1498. destructor TScheduledTasks.Destroy;
  1499. begin
  1500. if Assigned(fScheduler) then
  1501. begin
  1502. fScheduler.Terminate;
  1503. fScheduler.WaitFor;
  1504. fScheduler.Free;
  1505. end;
  1506. if Assigned(fTaskList) then fTaskList.Free;
  1507. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1508. inherited;
  1509. end;
  1510. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1511. begin
  1512. Result := fScheduler.Get(aIdTask);
  1513. end;
  1514. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1515. begin
  1516. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1517. Result := fScheduler.Get(aTaskName);
  1518. end;
  1519. procedure TScheduledTasks.Start;
  1520. begin
  1521. if fIsStarted then Exit;
  1522. if not Assigned(fScheduler) then
  1523. begin
  1524. fScheduler := TScheduler.Create(fTaskList);
  1525. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1526. end;
  1527. fScheduler.Start;
  1528. fIsStarted := True;
  1529. end;
  1530. procedure TScheduledTasks.Stop;
  1531. begin
  1532. if Assigned(fScheduler) then fScheduler.Terminate;
  1533. fIsStarted := False;
  1534. end;
  1535. { TScheduledTask }
  1536. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1537. begin
  1538. Result := Self;
  1539. SetParam(aName,aValue);
  1540. end;
  1541. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1542. begin
  1543. Result := Self;
  1544. SetParam(aName,aValue);
  1545. end;
  1546. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1547. begin
  1548. Result := Self;
  1549. ClearSchedule;
  1550. fScheduleMode := TScheduleMode.smRunOnce;
  1551. fStartDate := aStartDate;
  1552. fNextExecution := aStartDate;
  1553. end;
  1554. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1555. begin
  1556. Result := Self;
  1557. ClearSchedule;
  1558. fScheduleMode := TScheduleMode.smRunOnce;
  1559. fStartDate := IncMinute(Now(),aMinutes);
  1560. fNextExecution := fStartDate;
  1561. end;
  1562. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1563. begin
  1564. Result := Self;
  1565. ClearSchedule;
  1566. fScheduleMode := TScheduleMode.smRunOnce;
  1567. fStartDate := IncSecond(Now(),aSeconds);
  1568. fNextExecution := fStartDate;
  1569. end;
  1570. function TScheduledTask.StartNow: IScheduledTask;
  1571. begin
  1572. Result := Self;
  1573. ClearSchedule;
  1574. fScheduleMode := TScheduleMode.smRunOnce;
  1575. fStartDate := Now();
  1576. fNextExecution := fStartDate;
  1577. end;
  1578. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1579. begin
  1580. Result := Self;
  1581. ClearSchedule;
  1582. fScheduleMode := TScheduleMode.smRunOnce;
  1583. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1584. fNextExecution := fStartDate;
  1585. end;
  1586. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1587. begin
  1588. Result := Self;
  1589. ClearSchedule;
  1590. fScheduleMode := TScheduleMode.smRunOnce;
  1591. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1592. fNextExecution := fStartDate;
  1593. end;
  1594. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1595. begin
  1596. Result := Self;
  1597. ClearSchedule;
  1598. fScheduleMode := TScheduleMode.smRunOnce;
  1599. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1600. fNextExecution := fStartDate;
  1601. end;
  1602. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1603. begin
  1604. Result := Self;
  1605. SetRetryPolicy(aMaxRetries,0,0);
  1606. end;
  1607. function TScheduledTask.RetryForever: IScheduledTask;
  1608. begin
  1609. Result := Self;
  1610. SetRetryPolicy(-1,0,0);
  1611. end;
  1612. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1613. begin
  1614. Result := Self;
  1615. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
  1616. end;
  1617. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1618. begin
  1619. Result := Self;
  1620. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1621. end;
  1622. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1623. begin
  1624. Result := Self;
  1625. SetRetryPolicy(aWaitTimeArray);
  1626. end;
  1627. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1628. begin
  1629. Result := Self;
  1630. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
  1631. end;
  1632. function TScheduledTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1633. begin
  1634. Result := Self;
  1635. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1636. end;
  1637. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1638. begin
  1639. if fStartDate = 0.0 then ClearSchedule;
  1640. fScheduleMode := TScheduleMode.smRepeatMode;
  1641. fTimeMeasure := aTimeMeasure;
  1642. fTimeInterval := aInterval;
  1643. if fStartDate < Now() then fStartDate := Now();
  1644. fNextExecution := fStartDate;
  1645. fEnabled := True;
  1646. end;
  1647. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1648. begin
  1649. if fStartDate = 0.0 then ClearSchedule;
  1650. fScheduleMode := TScheduleMode.smRepeatMode;
  1651. fTimeMeasure := aTimeMeasure;
  1652. fTimeInterval := aInterval;
  1653. if fStartDate < Now() then fStartDate := Now();
  1654. fExpirationDate := aEndTime;
  1655. fNextExecution := fStartDate;
  1656. fEnabled := True;
  1657. end;
  1658. procedure TScheduledTask.RepeatEveryDay;
  1659. begin
  1660. RepeatEvery(1,tmDays);
  1661. end;
  1662. procedure TScheduledTask.RepeatEveryWeek;
  1663. begin
  1664. RepeatEvery(7,tmDays);
  1665. end;
  1666. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1667. begin
  1668. if fStartDate = 0.0 then ClearSchedule;
  1669. fScheduleMode := TScheduleMode.smRepeatMode;
  1670. fTimeMeasure := aTimeMeasure;
  1671. fTimeInterval := aInterval;
  1672. if fStartDate < Now() then fStartDate := Now();
  1673. fExpirationTimes := aRepeatTimes;
  1674. fNextExecution := fStartDate;
  1675. fEnabled := True;
  1676. end;
  1677. procedure TScheduledTask.RunOnce;
  1678. begin
  1679. fScheduleMode := TScheduleMode.smRunOnce;
  1680. if fStartDate < Now() then fStartDate := Now();
  1681. fNextExecution := fStartDate;
  1682. fEnabled := True;
  1683. end;
  1684. procedure TScheduledTask.Cancel;
  1685. begin
  1686. fFinished := True;
  1687. end;
  1688. function TScheduledTask.CheckSchedule: Boolean;
  1689. begin
  1690. Result := False;
  1691. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1692. if fScheduleMode = TScheduleMode.smRunOnce then
  1693. begin
  1694. //if start date reached
  1695. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1696. begin
  1697. fLastExecution := Now();
  1698. Inc(fExecutionTimes);
  1699. fFinished := True;
  1700. Result := True;
  1701. end;
  1702. end
  1703. else
  1704. begin
  1705. //if next execution reached
  1706. if Now() >= fNextExecution then
  1707. begin
  1708. //check expiration limits
  1709. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1710. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1711. begin
  1712. fFinished := True;
  1713. Exit;
  1714. end;
  1715. //calculate next execution
  1716. case fTimeMeasure of
  1717. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1718. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1719. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1720. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1721. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1722. end;
  1723. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1724. else
  1725. begin
  1726. fLastExecution := Now();
  1727. Inc(fExecutionTimes);
  1728. Result := True;
  1729. end;
  1730. end;
  1731. end;
  1732. end;
  1733. procedure TScheduledTask.ClearSchedule;
  1734. begin
  1735. inherited;
  1736. fFinished := False;
  1737. fStartDate := 0.0;
  1738. fLastExecution := 0.0;
  1739. fNextExecution := 0.0;
  1740. fExpirationDate := 0.0;
  1741. fScheduleMode := TScheduleMode.smRunOnce;
  1742. fTimeMeasure := TTimeMeasure.tmSeconds;
  1743. fTimeInterval := 0;
  1744. end;
  1745. procedure TScheduledTask.DoExpire;
  1746. begin
  1747. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1748. fEnabled := False;
  1749. end;
  1750. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1751. begin
  1752. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1753. end;
  1754. function TScheduledTask.GetTaskName: string;
  1755. begin
  1756. Result := fName;
  1757. end;
  1758. function TScheduledTask.IsFinished: Boolean;
  1759. begin
  1760. Result := fFinished;
  1761. end;
  1762. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1763. begin
  1764. fExceptProc := aTaskProc;
  1765. Result := Self;
  1766. end;
  1767. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1768. begin
  1769. Result := OnException(aTaskProc);
  1770. TTask(Result).ExceptionWithSync := True;
  1771. end;
  1772. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1773. begin
  1774. fRetryProc := aTaskProc;
  1775. Result := Self;
  1776. end;
  1777. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1778. begin
  1779. fExpiredProc := aTaskProc;
  1780. Result := Self;
  1781. end;
  1782. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1783. begin
  1784. Result := OnExpired(aTaskProc);
  1785. TScheduledTask(Result).ExpireWithSync := True;
  1786. end;
  1787. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1788. begin
  1789. fInitializeProc := aTaskProc;
  1790. Result := Self;
  1791. end;
  1792. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1793. begin
  1794. fTerminateProc := aTaskProc;
  1795. Result := Self;
  1796. end;
  1797. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1798. begin
  1799. Result := OnTerminated(aTaskProc);
  1800. TTask(Result).TerminateWithSync := True;
  1801. end;
  1802. { TScheduler }
  1803. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1804. begin
  1805. inherited Create(True);
  1806. FreeOnTerminate := False;
  1807. fListLock := TCriticalSection.Create;
  1808. fRemoveTaskAfterExpiration := False;
  1809. fTaskList := aTaskList;
  1810. {$IFDEF FPC}
  1811. fCondVar := TSimpleEvent.Create;
  1812. {$ELSE}
  1813. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1814. {$ENDIF}
  1815. end;
  1816. destructor TScheduler.Destroy;
  1817. begin
  1818. fCondVar.SetEvent;
  1819. fCondVar.Free;
  1820. fTaskList := nil;
  1821. fListLock.Free;
  1822. inherited;
  1823. end;
  1824. procedure TScheduler.Execute;
  1825. var
  1826. task : IScheduledTask;
  1827. worker : TScheduledWorker;
  1828. numworker : Int64;
  1829. begin
  1830. numworker := 0;
  1831. while not Terminated do
  1832. begin
  1833. fListLock.Enter;
  1834. try
  1835. for task in fTaskList do
  1836. begin
  1837. if (task.IsEnabled) and (not task.IsFinished) then
  1838. begin
  1839. if task.CheckSchedule then
  1840. begin
  1841. Inc(numworker);
  1842. worker := TScheduledWorker.Create(numworker,task);
  1843. worker.Start;
  1844. end;
  1845. end
  1846. else
  1847. begin
  1848. if task.IsEnabled then
  1849. begin
  1850. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1851. // else task.DoExpire;
  1852. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1853. end;
  1854. end;
  1855. end;
  1856. task := nil;
  1857. finally
  1858. fListLock.Leave;
  1859. end;
  1860. fCondVar.WaitFor(250);
  1861. end;
  1862. end;
  1863. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1864. begin
  1865. Result := fTaskList.Add(aTask);
  1866. end;
  1867. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1868. var
  1869. task : IScheduledTask;
  1870. begin
  1871. fListLock.Enter;
  1872. try
  1873. for task in fTaskList do
  1874. begin
  1875. if task.IdTask = aIdTask then Exit(task);
  1876. end;
  1877. finally
  1878. fListLock.Leave;
  1879. end;
  1880. end;
  1881. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1882. var
  1883. task : IScheduledTask;
  1884. begin
  1885. fListLock.Enter;
  1886. try
  1887. for task in fTaskList do
  1888. begin
  1889. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1890. end;
  1891. finally
  1892. fListLock.Leave;
  1893. end;
  1894. end;
  1895. { TAdvThread }
  1896. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1897. begin
  1898. inherited Create(True);
  1899. FreeOnTerminate := True;
  1900. fExecuteWithSync := aSynchronize;
  1901. fExecuteProc := aProc;
  1902. end;
  1903. procedure TAdvThread.DoExecute;
  1904. begin
  1905. try
  1906. if Assigned(fExecuteProc) then fExecuteProc;
  1907. except
  1908. on E : Exception do
  1909. begin
  1910. {$IFNDEF FPC}
  1911. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1912. {$ELSE}
  1913. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1914. {$ENDIF}
  1915. else raise e;
  1916. end;
  1917. end;
  1918. end;
  1919. procedure TAdvThread.CallToTerminate;
  1920. begin
  1921. if Assigned(fTerminateProc) then fTerminateProc;
  1922. end;
  1923. procedure TAdvThread.DoTerminate;
  1924. begin
  1925. if fTerminateWithSync then Synchronize(CallToTerminate)
  1926. else CallToTerminate;
  1927. end;
  1928. procedure TAdvThread.Execute;
  1929. begin
  1930. if fExecuteWithSync then Synchronize(DoExecute)
  1931. else DoExecute;
  1932. end;
  1933. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1934. begin
  1935. fExceptionProc := aProc;
  1936. end;
  1937. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1938. begin
  1939. fTerminateWithSync := aSynchronize;
  1940. fTerminateProc := aProc;
  1941. end;
  1942. { TRunTask }
  1943. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  1944. begin
  1945. Result := Execute([],False,aTaskProc);
  1946. end;
  1947. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  1948. begin
  1949. Result := Execute_Sync([],False,aTaskProc);
  1950. end;
  1951. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1952. var
  1953. task : TWorkTask;
  1954. worker : TSimpleWorker;
  1955. begin
  1956. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1957. task.ExecuteWithSync := False;
  1958. Result := task;
  1959. worker := TSimpleWorker.Create(task);
  1960. worker.Start;
  1961. end;
  1962. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1963. var
  1964. task : TWorkTask;
  1965. worker : TSimpleWorker;
  1966. begin
  1967. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1968. task.ExecuteWithSync := True;
  1969. Result := task;
  1970. worker := TSimpleWorker.Create(task);
  1971. worker.Start;
  1972. end;
  1973. { TParamValue }
  1974. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  1975. begin
  1976. inherited Create;
  1977. fName := aName;
  1978. fValue := aValue;
  1979. fOwned := aOwnedValue;
  1980. end;
  1981. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  1982. begin
  1983. inherited Create;
  1984. fName := aName;
  1985. fValue := aValue;
  1986. fOwned := aOwnedValue;
  1987. end;
  1988. constructor TParamValue.Create;
  1989. begin
  1990. fName := '';
  1991. fOwned := False;
  1992. end;
  1993. destructor TParamValue.Destroy;
  1994. begin
  1995. {$IFDEF FPC}
  1996. fValue._Release;
  1997. {$ENDIF}
  1998. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  1999. inherited;
  2000. end;
  2001. end.