Quick.Threads.pas 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465
  1. { ***************************************************************************
  2. Copyright (c) 2016-2021 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 08/03/2021
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. procedure Enable;
  209. procedure Disable;
  210. {$IFNDEF FPC}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  212. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ELSE}
  214. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  215. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  216. {$ENDIF}
  217. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  218. property Result : TFlexValue read GetResult write SetResult;
  219. property IdTask : Int64 read GetIdTask;
  220. function Done : Boolean;
  221. function Failed : Boolean;
  222. function NumRetries : Integer;
  223. function MaxRetries : Integer;
  224. function LastException : Exception;
  225. function CircuitBreaked : Boolean;
  226. function IsEnabled : Boolean;
  227. end;
  228. {$IFNDEF FPC}
  229. TTaskProc = reference to procedure(task : ITask);
  230. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  231. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  232. {$ELSE}
  233. TTaskProc = procedure(task : ITask) of object;
  234. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  235. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  236. {$ENDIF}
  237. IWorkTask = interface(ITask)
  238. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  239. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  240. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  241. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  242. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  243. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  244. function Retry(aMaxRetries : Integer) : IWorkTask;
  245. function RetryForever : IWorkTask;
  246. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  247. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  248. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  249. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  250. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  251. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  252. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  253. procedure Run;
  254. end;
  255. IScheduledTask = interface(ITask)
  256. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  257. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  259. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  260. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  261. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  262. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  263. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  264. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  265. function Retry(aMaxRetries : Integer) : IScheduledTask;
  266. function RetryForever : IScheduledTask;
  267. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  268. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  269. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  270. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  271. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  272. function CheckSchedule : Boolean;
  273. procedure DoExpire;
  274. function GetTaskName : string;
  275. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  276. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  277. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  278. function StartOnDayChange : IScheduledTask;
  279. function StartNow : IScheduledTask;
  280. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  281. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  282. procedure RunOnce;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  284. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  285. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  286. procedure RepeatEveryDay;
  287. procedure RepeatEveryWeek;
  288. function IsFinished : Boolean;
  289. procedure Cancel;
  290. property Name : string read GetTaskName;
  291. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  292. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  293. end;
  294. TTask = class(TInterfacedObject,ITask)
  295. private
  296. fIdTask : Int64;
  297. fNumWorker : Integer;
  298. fNumRetries : Integer;
  299. fParamList : TParamList;
  300. fInitializeProc : TTaskProc;
  301. fExecuteProc : TTaskProc;
  302. fExceptProc : TTaskExceptionProc;
  303. fTerminateProc : TTaskProc;
  304. fExpiredProc : TTaskProc;
  305. fTaskStatus : TWorkTaskStatus;
  306. fOwnedParams : Boolean;
  307. fEnabled : Boolean;
  308. fExecuteWithSync : Boolean;
  309. fExceptionWithSync : Boolean;
  310. fRetryProc : TTaskRetryProc;
  311. fTerminateWithSync : Boolean;
  312. fFaultControl : TFaultControl;
  313. fCustomFaultPolicy : Boolean;
  314. fResult : TFlexValue;
  315. function GetParam(aIndex : Integer) : TFlexValue; overload;
  316. function GetParam(const aName : string) : TFlexValue; overload;
  317. function GetParam2(aIndex : Integer) : PFlexValue;
  318. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  319. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  320. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  321. procedure DoInitialize;
  322. procedure DoExecute;
  323. procedure DoException(aException : Exception);
  324. procedure DoTerminate;
  325. function GetNumWorker : Integer;
  326. procedure SetNumWorker(Value : Integer);
  327. function GetIdTask : Int64;
  328. procedure SetIdTask(Value : Int64);
  329. function GetResult : TFlexValue;
  330. procedure SetResult(aValue : TFlexValue);
  331. protected
  332. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  333. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  334. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  335. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  336. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  337. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  338. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  339. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  340. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  341. public
  342. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  343. destructor Destroy; override;
  344. property IdTask : Int64 read GetIdTask;
  345. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  346. function IsEnabled : Boolean;
  347. function TaskStatus : TWorkTaskStatus;
  348. function Done : Boolean;
  349. function Failed : Boolean;
  350. function NumRetries : Integer;
  351. function MaxRetries : Integer;
  352. function LastException : Exception;
  353. function CircuitBreaked : Boolean;
  354. procedure Disable;
  355. procedure Enable;
  356. end;
  357. TWorkTask = class(TTask,IWorkTask)
  358. public
  359. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  360. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  361. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  362. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  363. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  364. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  365. function Retry(aMaxRetries : Integer) : IWorkTask;
  366. function RetryForever : IWorkTask;
  367. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  368. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  369. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  370. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  371. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  372. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  373. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  374. procedure Run; virtual;
  375. end;
  376. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  377. TScheduledTask = class(TTask,IScheduledTask)
  378. private
  379. fName : string;
  380. fExecutionTimes : Integer;
  381. fScheduleMode : TScheduleMode;
  382. fTimeInterval : Integer;
  383. fTimeMeasure : TTimeMeasure;
  384. fStartDate : TDateTime;
  385. fLastExecution : TDateTime;
  386. fNextExecution : TDateTime;
  387. fExpirationDate : TDateTime;
  388. fExpirationTimes : Integer;
  389. fFinished : Boolean;
  390. fExpireWithSync: Boolean;
  391. procedure ClearSchedule;
  392. function CheckSchedule : Boolean;
  393. procedure DoExpire;
  394. function GetTaskName : string;
  395. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  396. protected
  397. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  398. public
  399. property Name : string read fName write fName;
  400. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  401. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  402. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  403. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  404. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  405. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  406. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  407. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  408. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  409. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  410. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  411. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  412. function StartOnDayChange : IScheduledTask;
  413. function StartNow : IScheduledTask;
  414. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  415. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  416. procedure RunOnce;
  417. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  418. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  419. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  420. procedure RepeatEveryDay;
  421. procedure RepeatEveryWeek;
  422. function Retry(aMaxRetries : Integer) : IScheduledTask;
  423. function RetryForever : IScheduledTask;
  424. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  425. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  426. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  427. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  428. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  429. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  430. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  431. function IsFinished : Boolean;
  432. procedure Cancel;
  433. end;
  434. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  435. TWorker = class(TThread)
  436. protected
  437. fStatus : TWorkerStatus;
  438. fCurrentTask : ITask;
  439. fDefaultFaultPolicy : TFaultPolicy;
  440. procedure ExecuteTask;
  441. procedure TerminateTask;
  442. public
  443. constructor Create;
  444. destructor Destroy; override;
  445. property Status : TWorkerStatus read fStatus;
  446. procedure SetFaultPolicy(aTask : TTask);
  447. procedure Execute; override;
  448. end;
  449. TSimpleWorker = class(TWorker)
  450. private
  451. fRunOnce : Boolean;
  452. public
  453. constructor Create(aTask : ITask; aRunOnce : Boolean = True);
  454. procedure Execute; override;
  455. end;
  456. TQueueWorker = class(TWorker)
  457. private
  458. fCurrentIdTask : Integer;
  459. fNumWorker : Integer;
  460. fTaskQueue : TTaskQueue;
  461. public
  462. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  463. property NumWorker : Integer read fNumWorker;
  464. procedure Execute; override;
  465. end;
  466. TScheduledWorker = class(TWorker)
  467. private
  468. procedure ExpireTask;
  469. public
  470. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  471. procedure Execute; override;
  472. end;
  473. TWorkerPool = TObjectList<TWorker>;
  474. TRunTask = class
  475. public
  476. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  477. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  478. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  479. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  480. end;
  481. TBackgroundTasks = class
  482. private
  483. fMaxQueue : Integer;
  484. fWorkerPool : TWorkerPool;
  485. fConcurrentWorkers : Integer;
  486. fInsertTimeout : Cardinal;
  487. fExtractTimeout : Cardinal;
  488. fTaskQueue : TTaskQueue;
  489. fNumPushedTasks : Int64;
  490. function GetTaskQueue : Cardinal;
  491. public
  492. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  493. destructor Destroy; override;
  494. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  495. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  496. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  497. property TaskQueued : Cardinal read GetTaskQueue;
  498. property NumPushedTasks : Int64 read fNumPushedTasks;
  499. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  500. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  501. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  502. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  503. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  504. procedure Start;
  505. procedure CancelAll;
  506. end;
  507. TScheduledTaskList = TList<IScheduledTask>;
  508. TScheduler = class(TThread)
  509. private
  510. fListLock : TCriticalSection;
  511. fCondVar : TSimpleEvent;
  512. fTaskList : TScheduledTaskList;
  513. fRemoveTaskAfterExpiration : Boolean;
  514. public
  515. constructor Create(aTaskList : TScheduledTaskList);
  516. destructor Destroy; override;
  517. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  518. procedure Execute; override;
  519. function Add(aTask : TScheduledTask) : Integer;
  520. function Get(aIdTask : Int64) : IScheduledTask; overload;
  521. function Get(const aTaskName : string) : IScheduledTask; overload;
  522. end;
  523. TScheduledTasks = class
  524. private
  525. fTaskList : TScheduledTaskList;
  526. fScheduler : TScheduler;
  527. fNumPushedTasks : Int64;
  528. fRemoveTaskAfterExpiration : Boolean;
  529. fIsStarted : Boolean;
  530. fFaultPolicy : TFaultPolicy;
  531. public
  532. constructor Create;
  533. destructor Destroy; override;
  534. property NumPushedTasks : Int64 read fNumPushedTasks;
  535. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  536. property IsStarted : Boolean read fIsStarted;
  537. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  538. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  539. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  540. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  541. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  542. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  543. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  544. procedure Start;
  545. procedure Stop;
  546. end;
  547. TBackgroundWorkers = class
  548. private
  549. fWorkerPool : TWorkerPool;
  550. fConcurrentWorkers : Integer;
  551. fWorkerInitProc : TTaskProc;
  552. fWorkerExecuteProc : TTaskProc;
  553. fWorkerRetryProc : TTaskRetryProc;
  554. fWorkerExceptionProc : TTaskExceptionProc;
  555. fWorkerTerminateProc : TTaskProc;
  556. fMaxRetries : Integer;
  557. fFaultPolicy : TFaultPolicy;
  558. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  559. public
  560. constructor Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  561. destructor Destroy; override;
  562. property ConcurrentWorkers : Integer read fConcurrentWorkers;
  563. function OnInitialize(aTaskProc : TTaskProc) : TBackgroundWorkers;
  564. function OnException(aTaskProc : TTaskExceptionProc) : TBackgroundWorkers;
  565. function OnRetry(aTaskProc : TTaskRetryProc) : TBackgroundWorkers;
  566. function OnTerminated(aTaskProc : TTaskProc) : TBackgroundWorkers;
  567. function Retry(aMaxRetries : Integer) : TBackgroundWorkers;
  568. function RetryForever : TBackgroundWorkers;
  569. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  570. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : TBackgroundWorkers; overload;
  571. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  572. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  573. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  574. procedure Start;
  575. procedure Stop;
  576. end;
  577. implementation
  578. { TThreadedQueueCS<T> }
  579. procedure TThreadedQueueCS<T>.Clear;
  580. var
  581. obj : T;
  582. begin
  583. FQueueLock.Enter;
  584. try
  585. for obj in FQueue do
  586. begin
  587. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  588. end;
  589. SetLength(FQueue,0);
  590. finally
  591. FQueueLock.Leave;
  592. end;
  593. {$IFDEF FPC}
  594. FQueueCondVar.SetEvent;
  595. {$ELSE}
  596. FQueueCondVar.ReleaseAll;
  597. {$ENDIF}
  598. end;
  599. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  600. begin
  601. inherited Create;
  602. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  603. SetLength(FQueue, AQueueDepth);
  604. FQueueLock := TCriticalSection.Create;
  605. {$IFDEF FPC}
  606. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  607. {$ELSE}
  608. FQueueCondVar := TConditionVariableCS.Create;
  609. {$ENDIF}
  610. FPushTimeout := PushTimeout;
  611. FPopTimeout := PopTimeout;
  612. end;
  613. destructor TThreadedQueueCS<T>.Destroy;
  614. begin
  615. DoShutDown;
  616. FQueueLock.Free;
  617. FQueueCondVar.Free;
  618. inherited;
  619. end;
  620. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  621. begin
  622. FQueueLock.Enter;
  623. try
  624. SetLength(FQueue, Length(FQueue) + ADelta);
  625. finally
  626. FQueueLock.Leave;
  627. end;
  628. {$IFDEF FPC}
  629. FQueueCondVar.SetEvent;
  630. {$ELSE}
  631. FQueueCondVar.ReleaseAll;
  632. {$ENDIF}
  633. end;
  634. function TThreadedQueueCS<T>.PopItem: T;
  635. var
  636. LQueueSize: Integer;
  637. begin
  638. PopItem(LQueueSize, Result);
  639. end;
  640. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  641. begin
  642. AItem := Default(T);
  643. FQueueLock.Enter;
  644. try
  645. Result := wrSignaled;
  646. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  647. begin
  648. {$IFDEF FPC}
  649. Result := FQueueCondVar.WaitFor(FPopTimeout);
  650. {$ELSE}
  651. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  652. {$ENDIF}
  653. end;
  654. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  655. AItem := FQueue[FQueueOffset];
  656. FQueue[FQueueOffset] := Default(T);
  657. if FQueueSize = Length(FQueue) then
  658. begin
  659. {$IFDEF FPC}
  660. FQueueCondVar.SetEvent;
  661. {$ELSE}
  662. FQueueCondVar.ReleaseAll;
  663. {$ENDIF}
  664. end;
  665. Dec(FQueueSize);
  666. Inc(FQueueOffset);
  667. Inc(FTotalItemsPopped);
  668. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  669. finally
  670. AQueueSize := FQueueSize;
  671. FQueueLock.Leave;
  672. end;
  673. end;
  674. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  675. var
  676. LQueueSize: Integer;
  677. begin
  678. Result := PopItem(LQueueSize, AItem);
  679. end;
  680. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  681. begin
  682. PopItem(AQueueSize, Result);
  683. end;
  684. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  685. var
  686. LQueueSize: Integer;
  687. begin
  688. Result := PushItem(AItem, LQueueSize);
  689. end;
  690. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  691. begin
  692. FQueueLock.Enter;
  693. try
  694. if FQueueSize >= High(FQueue) then
  695. begin
  696. if FQueueSize < 1024 then Grow(FQueueSize)
  697. else Grow(FQueueSize Div 2);
  698. end;
  699. Result := wrSignaled;
  700. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  701. begin
  702. {$IFDEF FPC}
  703. Result := FQueueCondVar.WaitFor(FPushTimeout);
  704. {$ELSE}
  705. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  706. {$ENDIF}
  707. end;
  708. if FShutDown or (Result <> wrSignaled) then Exit;
  709. if FQueueSize = 0 then
  710. begin
  711. {$IFDEF FPC}
  712. FQueueCondVar.SetEvent;
  713. {$ELSE}
  714. FQueueCondVar.ReleaseAll;
  715. {$ENDIF}
  716. end;
  717. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  718. Inc(FQueueSize);
  719. Inc(FTotalItemsPushed);
  720. finally
  721. AQueueSize := FQueueSize;
  722. FQueueLock.Leave;
  723. end;
  724. end;
  725. procedure TThreadedQueueCS<T>.DoShutDown;
  726. begin
  727. FShutDown := True;
  728. {$IFDEF FPC}
  729. FQueueCondVar.SetEvent;
  730. {$ELSE}
  731. FQueueCondVar.ReleaseAll;
  732. {$ENDIF}
  733. end;
  734. { TThreadedQueueList<T> }
  735. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  736. begin
  737. inherited Create;
  738. fQueue := TQueue<T>.Create;
  739. fQueue.Capacity := AQueueDepth;
  740. fQueueSize := 0;
  741. fQueueLock := TCriticalSection.Create;
  742. {$IFDEF FPC}
  743. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  744. {$ELSE}
  745. fQueueCondVar := TConditionVariableCS.Create;
  746. {$ENDIF}
  747. fPushTimeout := PushTimeout;
  748. fPopTimeout := PopTimeout;
  749. end;
  750. destructor TThreadedQueueList<T>.Destroy;
  751. begin
  752. DoShutDown;
  753. fQueueLock.Free;
  754. fQueueCondVar.Free;
  755. fQueue.Free;
  756. inherited;
  757. end;
  758. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  759. begin
  760. fQueueLock.Enter;
  761. try
  762. fQueue.Capacity := fQueue.Capacity + ADelta;
  763. finally
  764. fQueueLock.Leave;
  765. end;
  766. {$IFDEF FPC}
  767. FQueueCondVar.SetEvent;
  768. {$ELSE}
  769. FQueueCondVar.ReleaseAll;
  770. {$ENDIF}
  771. end;
  772. function TThreadedQueueList<T>.PopItem: T;
  773. var
  774. LQueueSize: Integer;
  775. begin
  776. PopItem(LQueueSize, Result);
  777. end;
  778. {$IFDEF FPC}
  779. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  780. //var
  781. //crono : TChronometer;
  782. begin
  783. AItem := Default(T);
  784. //crono := TChronometer.Create(False);
  785. try
  786. Result := wrSignaled;
  787. //writeln('popitem');
  788. //crono.Start;
  789. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  790. begin
  791. //crono.Start;
  792. Result := FQueueCondVar.WaitFor(FPopTimeout);
  793. //crono.Stop;
  794. //writeln('in: ' + crono.ElapsedTime);
  795. //if result = twaitresult.wrError then result := twaitresult.wrError;
  796. end;
  797. //crono.Stop;
  798. //writeln('out: ' + crono.ElapsedTime);
  799. fQueueLock.Enter;
  800. try
  801. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  802. AItem := fQueue.Extract;
  803. Dec(FQueueSize);
  804. Inc(fTotalItemsPopped);
  805. finally
  806. fQueueLock.Leave;
  807. end;
  808. finally
  809. AQueueSize := fQueueSize;
  810. end;
  811. end;
  812. {$ELSE}
  813. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  814. begin
  815. AItem := Default(T);
  816. fQueueLock.Enter;
  817. try
  818. Result := wrSignaled;
  819. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  820. begin
  821. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  822. end;
  823. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  824. AItem := fQueue.Extract;
  825. if fQueueSize = fQueue.Count then
  826. begin
  827. FQueueCondVar.ReleaseAll;
  828. end;
  829. Dec(FQueueSize);
  830. Inc(fTotalItemsPopped);
  831. finally
  832. AQueueSize := fQueueSize;
  833. fQueueLock.Leave;
  834. end;
  835. end;
  836. {$ENDIF}
  837. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  838. var
  839. LQueueSize: Integer;
  840. begin
  841. Result := PopItem(LQueueSize, AItem);
  842. end;
  843. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  844. begin
  845. PopItem(AQueueSize, Result);
  846. end;
  847. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  848. var
  849. LQueueSize: Integer;
  850. begin
  851. Result := PushItem(AItem, LQueueSize);
  852. end;
  853. {$IFDEF FPC}
  854. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  855. begin
  856. FQueueLock.Enter;
  857. try
  858. if FQueueSize >= fQueue.Count then Grow(10);
  859. Result := wrSignaled;
  860. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  861. //begin
  862. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  863. //end;
  864. if fShutDown or (Result <> wrSignaled) then Exit;
  865. //if fQueueSize = 0 then
  866. //begin
  867. // FQueueCondVar.SetEvent;
  868. //end;
  869. fQueue.Enqueue(AItem);
  870. Inc(FQueueSize);
  871. Inc(fTotalItemsPushed);
  872. finally
  873. AQueueSize := fQueueSize;
  874. FQueueLock.Leave;
  875. //FQueueCondVar.SetEvent;
  876. end;
  877. end;
  878. {$ELSE}
  879. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  880. begin
  881. FQueueLock.Enter;
  882. try
  883. Result := wrSignaled;
  884. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  885. //begin
  886. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  887. //end;
  888. if fShutDown or (Result <> wrSignaled) then Exit;
  889. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  890. fQueue.Enqueue(AItem);
  891. Inc(FQueueSize);
  892. Inc(fTotalItemsPushed);
  893. finally
  894. AQueueSize := fQueueSize;
  895. FQueueLock.Leave;
  896. end;
  897. end;
  898. {$ENDIF}
  899. procedure TThreadedQueueList<T>.DoShutDown;
  900. begin
  901. fShutDown := True;
  902. {$IFDEF FPC}
  903. FQueueCondVar.SetEvent;
  904. {$ELSE}
  905. FQueueCondVar.ReleaseAll;
  906. {$ENDIF}
  907. end;
  908. {$IFNDEF FPC}
  909. { TThreadObjectList<T> }
  910. procedure TThreadObjectList<T>.Add(const Item: T);
  911. begin
  912. LockList;
  913. try
  914. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  915. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  916. finally
  917. UnlockList;
  918. end;
  919. end;
  920. procedure TThreadObjectList<T>.Clear;
  921. begin
  922. LockList;
  923. try
  924. fList.Clear;
  925. finally
  926. UnlockList;
  927. end;
  928. end;
  929. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  930. begin
  931. inherited Create;
  932. fLock := TObject.Create;
  933. fList := TObjectList<T>.Create;
  934. fDuplicates := dupIgnore;
  935. end;
  936. destructor TThreadObjectList<T>.Destroy;
  937. begin
  938. LockList;
  939. try
  940. fList.Free;
  941. inherited Destroy;
  942. finally
  943. UnlockList;
  944. fLock.Free;
  945. end;
  946. end;
  947. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  948. begin
  949. LockList;
  950. try
  951. Result := fList[aIndex];
  952. finally
  953. UnlockList;
  954. end;
  955. end;
  956. function TThreadObjectList<T>.LockList: TObjectList<T>;
  957. begin
  958. System.TMonitor.Enter(fLock);
  959. Result := fList;
  960. end;
  961. procedure TThreadObjectList<T>.Remove(const Item: T);
  962. begin
  963. RemoveItem(Item, TDirection.FromBeginning);
  964. end;
  965. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  966. begin
  967. LockList;
  968. try
  969. fList.RemoveItem(Item, Direction);
  970. finally
  971. UnlockList;
  972. end;
  973. end;
  974. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  975. begin
  976. LockList;
  977. try
  978. fList[aIndex] := aValue;
  979. finally
  980. UnlockList;
  981. end;
  982. end;
  983. procedure TThreadObjectList<T>.UnlockList;
  984. begin
  985. System.TMonitor.Exit(fLock);
  986. end;
  987. {$ENDIF}
  988. { TAnonymousThread }
  989. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  990. begin
  991. fThread := TAdvThread.Create(aProc,aSynchronize);
  992. end;
  993. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  994. begin
  995. Result := TAnonymousThread.Create(aProc,False);
  996. end;
  997. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  998. begin
  999. Result := TAnonymousThread.Create(aProc,True);
  1000. end;
  1001. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  1002. begin
  1003. Result := Self;
  1004. fThread.OnException(aProc);
  1005. end;
  1006. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  1007. begin
  1008. Result := Self;
  1009. fThread.OnTerminate(aProc,False);
  1010. end;
  1011. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  1012. begin
  1013. Result := Self;
  1014. fThread.OnTerminate(aProc,True);
  1015. end;
  1016. procedure TAnonymousThread.Start;
  1017. begin
  1018. fThread.Start;
  1019. end;
  1020. { TTask }
  1021. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  1022. var
  1023. i : Integer;
  1024. begin
  1025. fTaskStatus := TWorkTaskStatus.wtsPending;
  1026. fCustomFaultPolicy := False;
  1027. fNumRetries := 0;
  1028. fExecuteWithSync := False;
  1029. fTerminateWithSync := False;
  1030. fExceptionWithSync := False;
  1031. fFaultControl := TFaultControl.Create;
  1032. fFaultControl.OnRetry := DoRetry;
  1033. fOwnedParams := aOwnedParams;
  1034. fParamList := TParamList.Create(True);
  1035. for i := Low(aParamArray) to High(aParamArray) do
  1036. begin
  1037. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1038. {$IFDEF FPC}
  1039. fParamList[i].Value._AddRef;
  1040. {$ENDIF}
  1041. end;
  1042. fExecuteProc := aTaskProc;
  1043. fEnabled := False;
  1044. end;
  1045. destructor TTask.Destroy;
  1046. begin
  1047. fFaultControl.Free;
  1048. //free passed params
  1049. fParamList.Free;
  1050. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1051. inherited;
  1052. end;
  1053. procedure TTask.Disable;
  1054. begin
  1055. fEnabled := False;
  1056. end;
  1057. procedure TTask.DoException(aException : Exception);
  1058. begin
  1059. fTaskStatus := TWorkTaskStatus.wtsException;
  1060. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1061. else raise aException;
  1062. end;
  1063. procedure TTask.DoExecute;
  1064. begin
  1065. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1066. DoInitialize;
  1067. repeat
  1068. try
  1069. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1070. fTaskStatus := TWorkTaskStatus.wtsDone;
  1071. fFaultControl.SuccessExecution;
  1072. except
  1073. on E : Exception do
  1074. begin
  1075. fTaskStatus := TWorkTaskStatus.wtsException;
  1076. {$IFNDEF FPC}
  1077. {$IF DELPHIRX10_UP}
  1078. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1079. {$ELSE}
  1080. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1081. {$ENDIF}
  1082. {$ELSE}
  1083. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1084. {$ENDIF}
  1085. end;
  1086. end;
  1087. until not fFaultControl.NeedToRetry;
  1088. end;
  1089. procedure TTask.DoInitialize;
  1090. begin
  1091. try
  1092. fFaultControl.Reset;
  1093. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1094. except
  1095. on E : Exception do
  1096. begin
  1097. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1098. end;
  1099. end;
  1100. end;
  1101. function TTask.Done: Boolean;
  1102. begin
  1103. Result := not fFaultControl.TaskFailed;
  1104. end;
  1105. function TTask.Failed: Boolean;
  1106. begin
  1107. Result := fFaultControl.TaskFailed;
  1108. end;
  1109. function TTask.CircuitBreaked: Boolean;
  1110. begin
  1111. Result := fFaultControl.CircuitBreaked;
  1112. end;
  1113. function TTask.LastException: Exception;
  1114. begin
  1115. Result := fFaultControl.LastException;
  1116. end;
  1117. function TTask.MaxRetries: Integer;
  1118. begin
  1119. Result := fFaultControl.MaxRetries;
  1120. end;
  1121. function TTask.NumRetries: Integer;
  1122. begin
  1123. Result := fFaultControl.NumRetries;
  1124. end;
  1125. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1126. begin
  1127. vStopRetries := False;
  1128. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1129. end;
  1130. procedure TTask.DoTerminate;
  1131. begin
  1132. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1133. end;
  1134. procedure TTask.Enable;
  1135. begin
  1136. fEnabled := True;
  1137. end;
  1138. function TTask.GetIdTask: Int64;
  1139. begin
  1140. Result := fIdTask;
  1141. end;
  1142. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1143. begin
  1144. {$IFDEF FPC}
  1145. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1146. {$ENDIF}
  1147. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1148. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1149. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1150. end;
  1151. procedure TTask.SetIdTask(Value : Int64);
  1152. begin
  1153. fIdTask := Value;
  1154. end;
  1155. function TTask.GetNumWorker: Integer;
  1156. begin
  1157. Result := fNumWorker;
  1158. end;
  1159. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1160. begin
  1161. Result := fParamList[aIndex].Value;
  1162. end;
  1163. function TTask.GetParam(const aName: string): TFlexValue;
  1164. var
  1165. paramvalue : TParamValue;
  1166. begin
  1167. for paramvalue in fParamList do
  1168. begin
  1169. if CompareText(paramvalue.Name,aName) = 0 then
  1170. begin
  1171. Exit(paramvalue.Value)
  1172. end;
  1173. end;
  1174. //if not exists
  1175. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1176. end;
  1177. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1178. begin
  1179. Result := @fParamList[aIndex].Value;
  1180. end;
  1181. function TTask.GetResult: TFlexValue;
  1182. begin
  1183. Result := fResult;
  1184. end;
  1185. function TTask.IsEnabled: Boolean;
  1186. begin
  1187. Result := fEnabled;
  1188. end;
  1189. procedure TTask.SetNumWorker(Value: Integer);
  1190. begin
  1191. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1192. fNumWorker := Value;
  1193. end;
  1194. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1195. begin
  1196. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1197. fParamList[aIndex].Value := Value;
  1198. end;
  1199. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1200. var
  1201. paramvalue : TParamValue;
  1202. begin
  1203. //check if already exists parameter
  1204. for paramvalue in fParamList do
  1205. begin
  1206. if CompareText(paramvalue.Name,aName) = 0 then
  1207. begin
  1208. paramvalue.Value := Value;
  1209. Exit;
  1210. end;
  1211. end;
  1212. //if not exists, create one
  1213. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1214. end;
  1215. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1216. begin
  1217. SetParam(aName,Value,False);
  1218. end;
  1219. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1220. begin
  1221. fFaultControl.MaxRetries := aMaxRetries;
  1222. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1223. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1224. fCustomFaultPolicy := True;
  1225. end;
  1226. procedure TTask.SetResult(aValue: TFlexValue);
  1227. begin
  1228. fResult := aValue;
  1229. end;
  1230. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1231. begin
  1232. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1233. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1234. fFaultControl.WaitTimeMultiplierFactor := 1;
  1235. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1236. fCustomFaultPolicy := True;
  1237. end;
  1238. function TTask.TaskStatus: TWorkTaskStatus;
  1239. begin
  1240. Result := fTaskStatus;
  1241. end;
  1242. { TWorkTask }
  1243. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1244. begin
  1245. fExceptProc := aTaskProc;
  1246. Result := Self;
  1247. end;
  1248. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1249. begin
  1250. fExceptionWithSync := True;
  1251. Result := OnException(aTaskProc);
  1252. end;
  1253. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1254. begin
  1255. fInitializeProc := aTaskProc;
  1256. Result := Self;
  1257. end;
  1258. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1259. begin
  1260. fRetryProc := aTaskProc;
  1261. Result := Self;
  1262. end;
  1263. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1264. begin
  1265. fTerminateProc := aTaskProc;
  1266. Result := Self;
  1267. end;
  1268. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1269. begin
  1270. fTerminateWithSync := True;
  1271. Result := OnTerminated(aTaskProc);
  1272. end;
  1273. procedure TWorkTask.Run;
  1274. begin
  1275. fEnabled := True;
  1276. end;
  1277. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1278. begin
  1279. Result := Self;
  1280. SetParam(aName,aValue);
  1281. end;
  1282. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1283. begin
  1284. Result := Self;
  1285. SetParam(aName,aValue,aOwned);
  1286. end;
  1287. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1288. begin
  1289. Result := Self;
  1290. SetRetryPolicy(aMaxRetries,0,1);
  1291. end;
  1292. function TWorkTask.RetryForever: IWorkTask;
  1293. begin
  1294. Result := Self;
  1295. SetRetryPolicy(-1,0,1);
  1296. end;
  1297. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1298. begin
  1299. Result := Self;
  1300. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1301. end;
  1302. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1303. begin
  1304. Result := Self;
  1305. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1306. end;
  1307. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1308. begin
  1309. Result := Self;
  1310. SetRetryPolicy(aWaitTimeArray);
  1311. end;
  1312. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1313. begin
  1314. Result := Self;
  1315. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1316. end;
  1317. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1318. begin
  1319. Result := Self;
  1320. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1321. end;
  1322. { TBackgroundTasks }
  1323. procedure TBackgroundTasks.CancelAll;
  1324. begin
  1325. fTaskQueue.Clear;
  1326. end;
  1327. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1328. begin
  1329. fMaxQueue := 0;
  1330. fConcurrentWorkers := aConcurrentWorkers;
  1331. fInsertTimeout := INFINITE;
  1332. fExtractTimeout := 5000;
  1333. fNumPushedTasks := 0;
  1334. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1335. end;
  1336. destructor TBackgroundTasks.Destroy;
  1337. begin
  1338. CancelAll;
  1339. fTaskQueue.DoShutDown;
  1340. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1341. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1342. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1343. inherited;
  1344. end;
  1345. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1346. begin
  1347. Result := fTaskQueue.QueueSize;
  1348. end;
  1349. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1350. begin
  1351. Result := AddTask([],False,aTaskProc);
  1352. end;
  1353. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1354. var
  1355. worktask : IWorkTask;
  1356. begin
  1357. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1358. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1359. Inc(fNumPushedTasks);
  1360. worktask.SetIdTask(fNumPushedTasks);
  1361. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1362. begin
  1363. Result := worktask;
  1364. end;
  1365. end;
  1366. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1367. begin
  1368. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1369. TTask(Result).ExecuteWithSync := True;
  1370. end;
  1371. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1372. begin
  1373. Result := AddTask_Sync([],False,aTaskProc);
  1374. end;
  1375. procedure TBackgroundTasks.Start;
  1376. var
  1377. i : Integer;
  1378. worker : TWorker;
  1379. begin
  1380. //create workers
  1381. if fWorkerPool <> nil then fWorkerPool.Free;
  1382. fWorkerPool := TWorkerPool.Create(True);
  1383. for i := 1 to fConcurrentWorkers do
  1384. begin
  1385. worker := TQueueWorker.Create(i,fTaskQueue);
  1386. fWorkerPool.Add(worker);
  1387. worker.Start;
  1388. end;
  1389. end;
  1390. { TWorker }
  1391. constructor TWorker.Create;
  1392. begin
  1393. inherited Create(True);
  1394. fDefaultFaultPolicy := TFaultPolicy.Create;
  1395. fStatus := TWorkerStatus.wsSuspended;
  1396. FreeOnTerminate := False;
  1397. end;
  1398. destructor TWorker.Destroy;
  1399. begin
  1400. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1401. inherited;
  1402. end;
  1403. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1404. begin
  1405. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1406. end;
  1407. procedure TWorker.Execute;
  1408. begin
  1409. end;
  1410. procedure TWorker.ExecuteTask;
  1411. begin
  1412. fCurrentTask.DoExecute;
  1413. end;
  1414. procedure TWorker.TerminateTask;
  1415. begin
  1416. fCurrentTask.DoTerminate;
  1417. end;
  1418. { TSimpleWorker }
  1419. constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
  1420. begin
  1421. inherited Create;
  1422. fRunOnce := aRunOnce;
  1423. fCurrentTask := aTask;
  1424. FreeOnTerminate := True;
  1425. end;
  1426. procedure TSimpleWorker.Execute;
  1427. begin
  1428. fStatus := TWorkerStatus.wsIdle;
  1429. while not Terminated do
  1430. begin
  1431. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1432. try
  1433. fStatus := TWorkerStatus.wsWorking;
  1434. try
  1435. SetFaultPolicy(TTask(fCurrentTask));
  1436. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1437. else fCurrentTask.DoExecute;
  1438. except
  1439. on E : Exception do
  1440. begin
  1441. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1442. else raise ETaskExecutionError.Create(e.Message);
  1443. end;
  1444. end;
  1445. finally
  1446. fStatus := TWorkerStatus.wsIdle;
  1447. try
  1448. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1449. else fCurrentTask.DoTerminate;
  1450. except
  1451. on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1452. end;
  1453. if fRunOnce then Terminate;
  1454. end;
  1455. end;
  1456. fStatus := TWorkerStatus.wsSuspended
  1457. end;
  1458. { TQueueWorker }
  1459. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1460. begin
  1461. inherited Create;
  1462. fNumWorker := aNumWorker;
  1463. fTaskQueue := aTaskQueue;
  1464. end;
  1465. procedure TQueueWorker.Execute;
  1466. begin
  1467. fStatus := TWorkerStatus.wsIdle;
  1468. while not Terminated do
  1469. begin
  1470. fCurrentTask := fTaskQueue.PopItem;
  1471. if fCurrentTask <> nil then
  1472. try
  1473. fStatus := TWorkerStatus.wsWorking;
  1474. try
  1475. fCurrentIdTask := fCurrentTask.GetIdTask;
  1476. SetFaultPolicy(TTask(fCurrentTask));
  1477. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1478. else fCurrentTask.DoExecute;
  1479. except
  1480. on E : Exception do
  1481. begin
  1482. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1483. else raise ETaskExecutionError.Create(e.Message);
  1484. end;
  1485. end;
  1486. finally
  1487. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1488. else fCurrentTask.DoTerminate;
  1489. fStatus := TWorkerStatus.wsIdle;
  1490. end;
  1491. end;
  1492. fStatus := TWorkerStatus.wsSuspended
  1493. end;
  1494. { TScheduledWorker }
  1495. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1496. begin
  1497. inherited Create;
  1498. {$IFNDEF DELPHILINUX}
  1499. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1500. {$ENDIF}
  1501. FreeOnTerminate := True;
  1502. fCurrentTask := aScheduledTask;
  1503. end;
  1504. procedure TScheduledWorker.Execute;
  1505. begin
  1506. fStatus := TWorkerStatus.wsIdle;
  1507. if Assigned(fCurrentTask) then
  1508. begin
  1509. try
  1510. fStatus := TWorkerStatus.wsWorking;
  1511. try
  1512. SetFaultPolicy(TTask(fCurrentTask));
  1513. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1514. else fCurrentTask.DoExecute;
  1515. fStatus := TWorkerStatus.wsIdle;
  1516. except
  1517. on E : Exception do
  1518. begin
  1519. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1520. else raise ETaskExecutionError.Create(e.Message);
  1521. end;
  1522. end;
  1523. finally
  1524. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1525. else fCurrentTask.DoTerminate;
  1526. //check if expired
  1527. if (fCurrentTask as IScheduledTask).IsFinished then
  1528. begin
  1529. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1530. else (fCurrentTask as IScheduledTask).DoExpire;
  1531. end;
  1532. end;
  1533. end;
  1534. fCurrentTask := nil;
  1535. fStatus := TWorkerStatus.wsSuspended;
  1536. end;
  1537. procedure TScheduledWorker.ExpireTask;
  1538. begin
  1539. (fCurrentTask as IScheduledTask).DoExpire;
  1540. end;
  1541. { TScheduledTasks }
  1542. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1543. begin
  1544. Result := AddTask(aTaskName,[],False,aTaskProc);
  1545. end;
  1546. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1547. var
  1548. scheduletask : TScheduledTask;
  1549. begin
  1550. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1551. scheduletask.Name := aTaskName;
  1552. Inc(fNumPushedTasks);
  1553. scheduletask.SetIdTask(fNumPushedTasks);
  1554. fTaskList.Add(scheduletask);
  1555. Result := scheduletask;
  1556. end;
  1557. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1558. begin
  1559. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1560. TTask(Result).ExecuteWithSync := True;
  1561. end;
  1562. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1563. begin
  1564. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1565. end;
  1566. constructor TScheduledTasks.Create;
  1567. begin
  1568. fNumPushedTasks := 0;
  1569. fIsStarted := False;
  1570. fFaultPolicy := TFaultPolicy.Create;
  1571. fTaskList := TScheduledTaskList.Create;
  1572. end;
  1573. destructor TScheduledTasks.Destroy;
  1574. begin
  1575. if Assigned(fScheduler) then
  1576. begin
  1577. fScheduler.Terminate;
  1578. fScheduler.WaitFor;
  1579. fScheduler.Free;
  1580. end;
  1581. if Assigned(fTaskList) then fTaskList.Free;
  1582. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1583. inherited;
  1584. end;
  1585. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1586. begin
  1587. Result := fScheduler.Get(aIdTask);
  1588. end;
  1589. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1590. begin
  1591. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1592. Result := fScheduler.Get(aTaskName);
  1593. end;
  1594. procedure TScheduledTasks.Start;
  1595. begin
  1596. if fIsStarted then Exit;
  1597. if not Assigned(fScheduler) then
  1598. begin
  1599. fScheduler := TScheduler.Create(fTaskList);
  1600. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1601. end;
  1602. fScheduler.Start;
  1603. fIsStarted := True;
  1604. end;
  1605. procedure TScheduledTasks.Stop;
  1606. begin
  1607. if Assigned(fScheduler) then fScheduler.Terminate;
  1608. fIsStarted := False;
  1609. end;
  1610. { TScheduledTask }
  1611. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1612. begin
  1613. Result := Self;
  1614. SetParam(aName,aValue);
  1615. end;
  1616. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1617. begin
  1618. Result := Self;
  1619. SetParam(aName,aValue);
  1620. end;
  1621. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1622. begin
  1623. Result := Self;
  1624. ClearSchedule;
  1625. fScheduleMode := TScheduleMode.smRunOnce;
  1626. fStartDate := aStartDate;
  1627. fNextExecution := aStartDate;
  1628. end;
  1629. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1630. begin
  1631. Result := Self;
  1632. ClearSchedule;
  1633. fScheduleMode := TScheduleMode.smRunOnce;
  1634. fStartDate := IncMinute(Now(),aMinutes);
  1635. fNextExecution := fStartDate;
  1636. end;
  1637. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1638. begin
  1639. Result := Self;
  1640. ClearSchedule;
  1641. fScheduleMode := TScheduleMode.smRunOnce;
  1642. fStartDate := IncSecond(Now(),aSeconds);
  1643. fNextExecution := fStartDate;
  1644. end;
  1645. function TScheduledTask.StartNow: IScheduledTask;
  1646. begin
  1647. Result := Self;
  1648. ClearSchedule;
  1649. fScheduleMode := TScheduleMode.smRunOnce;
  1650. fStartDate := Now();
  1651. fNextExecution := fStartDate;
  1652. end;
  1653. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1654. begin
  1655. Result := Self;
  1656. ClearSchedule;
  1657. fScheduleMode := TScheduleMode.smRunOnce;
  1658. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1659. fNextExecution := fStartDate;
  1660. end;
  1661. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1662. begin
  1663. Result := Self;
  1664. ClearSchedule;
  1665. fScheduleMode := TScheduleMode.smRunOnce;
  1666. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1667. fNextExecution := fStartDate;
  1668. end;
  1669. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1670. begin
  1671. Result := Self;
  1672. ClearSchedule;
  1673. fScheduleMode := TScheduleMode.smRunOnce;
  1674. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1675. fNextExecution := fStartDate;
  1676. end;
  1677. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1678. begin
  1679. Result := Self;
  1680. SetRetryPolicy(aMaxRetries,0,1);
  1681. end;
  1682. function TScheduledTask.RetryForever: IScheduledTask;
  1683. begin
  1684. Result := Self;
  1685. SetRetryPolicy(-1,0,1);
  1686. end;
  1687. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1688. begin
  1689. Result := Self;
  1690. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1691. end;
  1692. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1693. begin
  1694. Result := Self;
  1695. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1696. end;
  1697. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1698. begin
  1699. Result := Self;
  1700. SetRetryPolicy(aWaitTimeArray);
  1701. end;
  1702. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1703. begin
  1704. Result := Self;
  1705. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1706. end;
  1707. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1708. begin
  1709. Result := Self;
  1710. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1711. end;
  1712. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1713. begin
  1714. if fStartDate = 0.0 then ClearSchedule;
  1715. fScheduleMode := TScheduleMode.smRepeatMode;
  1716. fTimeMeasure := aTimeMeasure;
  1717. fTimeInterval := aInterval;
  1718. if fStartDate < Now() then fStartDate := Now();
  1719. fNextExecution := fStartDate;
  1720. fEnabled := True;
  1721. end;
  1722. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1723. begin
  1724. if fStartDate = 0.0 then ClearSchedule;
  1725. fScheduleMode := TScheduleMode.smRepeatMode;
  1726. fTimeMeasure := aTimeMeasure;
  1727. fTimeInterval := aInterval;
  1728. if fStartDate < Now() then fStartDate := Now();
  1729. fExpirationDate := aEndTime;
  1730. fNextExecution := fStartDate;
  1731. fEnabled := True;
  1732. end;
  1733. procedure TScheduledTask.RepeatEveryDay;
  1734. begin
  1735. RepeatEvery(1,tmDays);
  1736. end;
  1737. procedure TScheduledTask.RepeatEveryWeek;
  1738. begin
  1739. RepeatEvery(7,tmDays);
  1740. end;
  1741. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1742. begin
  1743. if fStartDate = 0.0 then ClearSchedule;
  1744. fScheduleMode := TScheduleMode.smRepeatMode;
  1745. fTimeMeasure := aTimeMeasure;
  1746. fTimeInterval := aInterval;
  1747. if fStartDate < Now() then fStartDate := Now();
  1748. fExpirationTimes := aRepeatTimes;
  1749. fNextExecution := fStartDate;
  1750. fEnabled := True;
  1751. end;
  1752. procedure TScheduledTask.RunOnce;
  1753. begin
  1754. fScheduleMode := TScheduleMode.smRunOnce;
  1755. if fStartDate < Now() then fStartDate := Now();
  1756. fNextExecution := fStartDate;
  1757. fEnabled := True;
  1758. end;
  1759. procedure TScheduledTask.Cancel;
  1760. begin
  1761. fFinished := True;
  1762. end;
  1763. function TScheduledTask.CheckSchedule: Boolean;
  1764. begin
  1765. Result := False;
  1766. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1767. if fScheduleMode = TScheduleMode.smRunOnce then
  1768. begin
  1769. //if start date reached
  1770. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1771. begin
  1772. fLastExecution := Now();
  1773. Inc(fExecutionTimes);
  1774. fFinished := True;
  1775. Result := True;
  1776. end;
  1777. end
  1778. else
  1779. begin
  1780. //if next execution reached
  1781. if Now() >= fNextExecution then
  1782. begin
  1783. //check expiration limits
  1784. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1785. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1786. begin
  1787. fFinished := True;
  1788. Exit;
  1789. end;
  1790. //calculate next execution
  1791. case fTimeMeasure of
  1792. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1793. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1794. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1795. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1796. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1797. end;
  1798. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1799. else
  1800. begin
  1801. fLastExecution := Now();
  1802. Inc(fExecutionTimes);
  1803. Result := True;
  1804. end;
  1805. end;
  1806. end;
  1807. end;
  1808. procedure TScheduledTask.ClearSchedule;
  1809. begin
  1810. inherited;
  1811. fFinished := False;
  1812. fStartDate := 0.0;
  1813. fLastExecution := 0.0;
  1814. fNextExecution := 0.0;
  1815. fExpirationDate := 0.0;
  1816. fScheduleMode := TScheduleMode.smRunOnce;
  1817. fTimeMeasure := TTimeMeasure.tmSeconds;
  1818. fTimeInterval := 0;
  1819. end;
  1820. procedure TScheduledTask.DoExpire;
  1821. begin
  1822. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1823. fEnabled := False;
  1824. end;
  1825. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1826. begin
  1827. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1828. end;
  1829. function TScheduledTask.GetTaskName: string;
  1830. begin
  1831. Result := fName;
  1832. end;
  1833. function TScheduledTask.IsFinished: Boolean;
  1834. begin
  1835. Result := fFinished;
  1836. end;
  1837. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1838. begin
  1839. fExceptProc := aTaskProc;
  1840. Result := Self;
  1841. end;
  1842. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1843. begin
  1844. Result := OnException(aTaskProc);
  1845. TTask(Result).ExceptionWithSync := True;
  1846. end;
  1847. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1848. begin
  1849. fRetryProc := aTaskProc;
  1850. Result := Self;
  1851. end;
  1852. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1853. begin
  1854. fExpiredProc := aTaskProc;
  1855. Result := Self;
  1856. end;
  1857. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1858. begin
  1859. Result := OnExpired(aTaskProc);
  1860. TScheduledTask(Result).ExpireWithSync := True;
  1861. end;
  1862. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1863. begin
  1864. fInitializeProc := aTaskProc;
  1865. Result := Self;
  1866. end;
  1867. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1868. begin
  1869. fTerminateProc := aTaskProc;
  1870. Result := Self;
  1871. end;
  1872. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1873. begin
  1874. Result := OnTerminated(aTaskProc);
  1875. TTask(Result).TerminateWithSync := True;
  1876. end;
  1877. { TScheduler }
  1878. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1879. begin
  1880. inherited Create(True);
  1881. FreeOnTerminate := False;
  1882. fListLock := TCriticalSection.Create;
  1883. fRemoveTaskAfterExpiration := False;
  1884. fTaskList := aTaskList;
  1885. {$IFDEF FPC}
  1886. fCondVar := TSimpleEvent.Create;
  1887. {$ELSE}
  1888. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1889. {$ENDIF}
  1890. end;
  1891. destructor TScheduler.Destroy;
  1892. begin
  1893. fCondVar.SetEvent;
  1894. fCondVar.Free;
  1895. fTaskList := nil;
  1896. fListLock.Free;
  1897. inherited;
  1898. end;
  1899. procedure TScheduler.Execute;
  1900. var
  1901. task : IScheduledTask;
  1902. worker : TScheduledWorker;
  1903. numworker : Int64;
  1904. begin
  1905. numworker := 0;
  1906. while not Terminated do
  1907. begin
  1908. fListLock.Enter;
  1909. try
  1910. for task in fTaskList do
  1911. begin
  1912. if (task.IsEnabled) and (not task.IsFinished) then
  1913. begin
  1914. if task.CheckSchedule then
  1915. begin
  1916. Inc(numworker);
  1917. worker := TScheduledWorker.Create(numworker,task);
  1918. worker.Start;
  1919. end;
  1920. end
  1921. else
  1922. begin
  1923. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1924. end;
  1925. end;
  1926. task := nil;
  1927. finally
  1928. fListLock.Leave;
  1929. end;
  1930. fCondVar.WaitFor(250);
  1931. end;
  1932. end;
  1933. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1934. begin
  1935. Result := fTaskList.Add(aTask);
  1936. end;
  1937. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1938. var
  1939. task : IScheduledTask;
  1940. begin
  1941. fListLock.Enter;
  1942. try
  1943. for task in fTaskList do
  1944. begin
  1945. if task.IdTask = aIdTask then Exit(task);
  1946. end;
  1947. finally
  1948. fListLock.Leave;
  1949. end;
  1950. end;
  1951. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1952. var
  1953. task : IScheduledTask;
  1954. begin
  1955. fListLock.Enter;
  1956. try
  1957. for task in fTaskList do
  1958. begin
  1959. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1960. end;
  1961. finally
  1962. fListLock.Leave;
  1963. end;
  1964. end;
  1965. { TAdvThread }
  1966. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1967. begin
  1968. inherited Create(True);
  1969. FreeOnTerminate := True;
  1970. fExecuteWithSync := aSynchronize;
  1971. fExecuteProc := aProc;
  1972. end;
  1973. procedure TAdvThread.DoExecute;
  1974. begin
  1975. try
  1976. if Assigned(fExecuteProc) then fExecuteProc;
  1977. except
  1978. on E : Exception do
  1979. begin
  1980. {$IFNDEF FPC}
  1981. {$IF DELPHIRX10_UP}
  1982. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1983. {$ELSE}
  1984. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1985. {$ENDIF}
  1986. {$ELSE}
  1987. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1988. {$ENDIF}
  1989. else raise e;
  1990. end;
  1991. end;
  1992. end;
  1993. procedure TAdvThread.CallToTerminate;
  1994. begin
  1995. if Assigned(fTerminateProc) then fTerminateProc;
  1996. end;
  1997. procedure TAdvThread.DoTerminate;
  1998. begin
  1999. if fTerminateWithSync then Synchronize(CallToTerminate)
  2000. else CallToTerminate;
  2001. end;
  2002. procedure TAdvThread.Execute;
  2003. begin
  2004. if fExecuteWithSync then Synchronize(DoExecute)
  2005. else DoExecute;
  2006. end;
  2007. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  2008. begin
  2009. fExceptionProc := aProc;
  2010. end;
  2011. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  2012. begin
  2013. fTerminateWithSync := aSynchronize;
  2014. fTerminateProc := aProc;
  2015. end;
  2016. { TRunTask }
  2017. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  2018. begin
  2019. Result := Execute([],False,aTaskProc);
  2020. end;
  2021. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  2022. begin
  2023. Result := Execute_Sync([],False,aTaskProc);
  2024. end;
  2025. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2026. var
  2027. task : TWorkTask;
  2028. worker : TSimpleWorker;
  2029. begin
  2030. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2031. task.ExecuteWithSync := False;
  2032. Result := task;
  2033. worker := TSimpleWorker.Create(task);
  2034. worker.Start;
  2035. end;
  2036. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2037. var
  2038. task : TWorkTask;
  2039. worker : TSimpleWorker;
  2040. begin
  2041. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2042. task.ExecuteWithSync := True;
  2043. Result := task;
  2044. worker := TSimpleWorker.Create(task);
  2045. worker.Start;
  2046. end;
  2047. { TParamValue }
  2048. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2049. begin
  2050. inherited Create;
  2051. fName := aName;
  2052. fValue := aValue;
  2053. fOwned := aOwnedValue;
  2054. end;
  2055. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2056. begin
  2057. inherited Create;
  2058. fName := aName;
  2059. fValue := aValue;
  2060. fOwned := aOwnedValue;
  2061. end;
  2062. constructor TParamValue.Create;
  2063. begin
  2064. fName := '';
  2065. fOwned := False;
  2066. end;
  2067. destructor TParamValue.Destroy;
  2068. begin
  2069. {$IFDEF FPC}
  2070. fValue._Release;
  2071. {$ENDIF}
  2072. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2073. inherited;
  2074. end;
  2075. { TBackgroundWorkers }
  2076. constructor TBackgroundWorkers.Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  2077. begin
  2078. fConcurrentWorkers := aConcurrentWorkers;
  2079. fWorkerExecuteProc := aWorkerProc;
  2080. fWorkerPool := TWorkerPool.Create(True);
  2081. end;
  2082. destructor TBackgroundWorkers.Destroy;
  2083. begin
  2084. fWorkerPool.Free;
  2085. inherited;
  2086. end;
  2087. procedure TBackgroundWorkers.Start;
  2088. var
  2089. i : Integer;
  2090. worker : TWorker;
  2091. task : IWorkTask;
  2092. begin
  2093. for i := 1 to fConcurrentWorkers do
  2094. begin
  2095. task := TWorkTask.Create([],False,fWorkerExecuteProc)
  2096. .OnInitialize(fWorkerInitProc)
  2097. .OnRetry(fWorkerRetryProc)
  2098. .OnException(fWorkerExceptionProc)
  2099. .OnTerminated(fWorkerTerminateProc);
  2100. task.NumWorker := i;
  2101. task.Run;
  2102. worker := TSimpleWorker.Create(task,False);
  2103. fWorkerPool.Add(worker);
  2104. worker.Start;
  2105. end;
  2106. end;
  2107. procedure TBackgroundWorkers.Stop;
  2108. var
  2109. worker : TWorker;
  2110. begin
  2111. for worker in fWorkerPool do
  2112. begin
  2113. worker.Terminate;
  2114. worker.WaitFor;
  2115. fWorkerPool.Remove(worker);
  2116. end;
  2117. end;
  2118. function TBackgroundWorkers.OnException(aTaskProc: TTaskExceptionProc): TBackgroundWorkers;
  2119. begin
  2120. Result := Self;
  2121. fWorkerExceptionProc := aTaskProc;
  2122. end;
  2123. function TBackgroundWorkers.OnInitialize(aTaskProc: TTaskProc): TBackgroundWorkers;
  2124. begin
  2125. Result := Self;
  2126. fWorkerInitProc := aTaskProc;
  2127. end;
  2128. function TBackgroundWorkers.OnRetry(aTaskProc: TTaskRetryProc): TBackgroundWorkers;
  2129. begin
  2130. Result := Self;
  2131. fWorkerRetryProc := aTaskProc;
  2132. end;
  2133. function TBackgroundWorkers.OnTerminated(aTaskProc: TTaskProc): TBackgroundWorkers;
  2134. begin
  2135. Result := Self;
  2136. fWorkerTerminateProc := aTaskProc;
  2137. end;
  2138. function TBackgroundWorkers.Retry(aMaxRetries: Integer): TBackgroundWorkers;
  2139. begin
  2140. Result := Self;
  2141. SetRetryPolicy(aMaxRetries,0,1);
  2142. end;
  2143. function TBackgroundWorkers.RetryForever: TBackgroundWorkers;
  2144. begin
  2145. Result := Self;
  2146. SetRetryPolicy(-1,0,1);
  2147. end;
  2148. procedure TBackgroundWorkers.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double);
  2149. begin
  2150. fFaultPolicy.MaxRetries := aMaxRetries;
  2151. fFaultPolicy.WaitTimeBetweenRetries := aWaitTimeBetweenRetriesMS;
  2152. fFaultPolicy.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  2153. end;
  2154. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2155. begin
  2156. end;
  2157. function TBackgroundWorkers.WaitAndRetry(aWaitTimeArray: TArray<Integer>): TBackgroundWorkers;
  2158. begin
  2159. end;
  2160. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2161. begin
  2162. end;
  2163. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2164. begin
  2165. end;
  2166. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2167. begin
  2168. end;
  2169. end.