Quick.Threads.pas 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466
  1. { ***************************************************************************
  2. Copyright (c) 2016-2021 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 08/03/2021
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. procedure Enable;
  209. procedure Disable;
  210. {$IFNDEF FPC}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  212. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ELSE}
  214. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  215. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  216. {$ENDIF}
  217. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  218. property Result : TFlexValue read GetResult write SetResult;
  219. property IdTask : Int64 read GetIdTask;
  220. function Done : Boolean;
  221. function Failed : Boolean;
  222. function NumRetries : Integer;
  223. function MaxRetries : Integer;
  224. function LastException : Exception;
  225. function CircuitBreaked : Boolean;
  226. function IsEnabled : Boolean;
  227. end;
  228. {$IFNDEF FPC}
  229. TTaskProc = reference to procedure(task : ITask);
  230. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  231. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  232. {$ELSE}
  233. TTaskProc = procedure(task : ITask) of object;
  234. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  235. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  236. {$ENDIF}
  237. IWorkTask = interface(ITask)
  238. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  239. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  240. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  241. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  242. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  243. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  244. function Retry(aMaxRetries : Integer) : IWorkTask;
  245. function RetryForever : IWorkTask;
  246. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  247. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  248. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  249. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  250. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  251. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  252. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  253. procedure Run;
  254. end;
  255. IScheduledTask = interface(ITask)
  256. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  257. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  259. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  260. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  261. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  262. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  263. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  264. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  265. function Retry(aMaxRetries : Integer) : IScheduledTask;
  266. function RetryForever : IScheduledTask;
  267. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  268. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  269. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  270. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  271. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  272. function CheckSchedule : Boolean;
  273. procedure DoExpire;
  274. function GetTaskName : string;
  275. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  276. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  277. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  278. function StartOnDayChange : IScheduledTask;
  279. function StartNow : IScheduledTask;
  280. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  281. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  282. procedure RunOnce;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  284. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  285. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  286. procedure RepeatEveryDay;
  287. procedure RepeatEveryWeek;
  288. function IsFinished : Boolean;
  289. procedure Cancel;
  290. property Name : string read GetTaskName;
  291. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  292. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  293. end;
  294. TTask = class(TInterfacedObject,ITask)
  295. private
  296. fIdTask : Int64;
  297. fNumWorker : Integer;
  298. fNumRetries : Integer;
  299. fParamList : TParamList;
  300. fInitializeProc : TTaskProc;
  301. fExecuteProc : TTaskProc;
  302. fExceptProc : TTaskExceptionProc;
  303. fTerminateProc : TTaskProc;
  304. fExpiredProc : TTaskProc;
  305. fTaskStatus : TWorkTaskStatus;
  306. fOwnedParams : Boolean;
  307. fEnabled : Boolean;
  308. fExecuteWithSync : Boolean;
  309. fExceptionWithSync : Boolean;
  310. fRetryProc : TTaskRetryProc;
  311. fTerminateWithSync : Boolean;
  312. fFaultControl : TFaultControl;
  313. fCustomFaultPolicy : Boolean;
  314. fResult : TFlexValue;
  315. function GetParam(aIndex : Integer) : TFlexValue; overload;
  316. function GetParam(const aName : string) : TFlexValue; overload;
  317. function GetParam2(aIndex : Integer) : PFlexValue;
  318. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  319. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  320. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  321. procedure DoInitialize;
  322. procedure DoExecute;
  323. procedure DoException(aException : Exception);
  324. procedure DoTerminate;
  325. function GetNumWorker : Integer;
  326. procedure SetNumWorker(Value : Integer);
  327. function GetIdTask : Int64;
  328. procedure SetIdTask(Value : Int64);
  329. function GetResult : TFlexValue;
  330. procedure SetResult(aValue : TFlexValue);
  331. protected
  332. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  333. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  334. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  335. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  336. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  337. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  338. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  339. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  340. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  341. public
  342. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  343. destructor Destroy; override;
  344. property IdTask : Int64 read GetIdTask;
  345. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  346. function IsEnabled : Boolean;
  347. function TaskStatus : TWorkTaskStatus;
  348. function Done : Boolean;
  349. function Failed : Boolean;
  350. function NumRetries : Integer;
  351. function MaxRetries : Integer;
  352. function LastException : Exception;
  353. function CircuitBreaked : Boolean;
  354. procedure Disable;
  355. procedure Enable;
  356. end;
  357. TWorkTask = class(TTask,IWorkTask)
  358. public
  359. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  360. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  361. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  362. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  363. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  364. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  365. function Retry(aMaxRetries : Integer) : IWorkTask;
  366. function RetryForever : IWorkTask;
  367. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  368. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  369. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  370. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  371. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  372. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  373. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  374. procedure Run; virtual;
  375. end;
  376. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  377. TScheduledTask = class(TTask,IScheduledTask)
  378. private
  379. fName : string;
  380. fExecutionTimes : Integer;
  381. fScheduleMode : TScheduleMode;
  382. fTimeInterval : Integer;
  383. fTimeMeasure : TTimeMeasure;
  384. fStartDate : TDateTime;
  385. fLastExecution : TDateTime;
  386. fNextExecution : TDateTime;
  387. fExpirationDate : TDateTime;
  388. fExpirationTimes : Integer;
  389. fFinished : Boolean;
  390. fExpireWithSync: Boolean;
  391. procedure ClearSchedule;
  392. function CheckSchedule : Boolean;
  393. procedure DoExpire;
  394. function GetTaskName : string;
  395. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  396. protected
  397. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  398. public
  399. property Name : string read fName write fName;
  400. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  401. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  402. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  403. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  404. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  405. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  406. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  407. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  408. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  409. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  410. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  411. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  412. function StartOnDayChange : IScheduledTask;
  413. function StartNow : IScheduledTask;
  414. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  415. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  416. procedure RunOnce;
  417. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  418. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  419. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  420. procedure RepeatEveryDay;
  421. procedure RepeatEveryWeek;
  422. function Retry(aMaxRetries : Integer) : IScheduledTask;
  423. function RetryForever : IScheduledTask;
  424. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  425. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  426. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  427. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  428. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  429. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  430. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  431. function IsFinished : Boolean;
  432. procedure Cancel;
  433. end;
  434. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  435. TWorker = class(TThread)
  436. protected
  437. fStatus : TWorkerStatus;
  438. fCurrentTask : ITask;
  439. fDefaultFaultPolicy : TFaultPolicy;
  440. procedure ExecuteTask;
  441. procedure TerminateTask;
  442. public
  443. constructor Create;
  444. destructor Destroy; override;
  445. property Status : TWorkerStatus read fStatus;
  446. procedure SetFaultPolicy(aTask : TTask);
  447. procedure Execute; override;
  448. end;
  449. TSimpleWorker = class(TWorker)
  450. private
  451. fRunOnce : Boolean;
  452. public
  453. constructor Create(aTask : ITask; aRunOnce : Boolean = True);
  454. procedure Execute; override;
  455. end;
  456. TQueueWorker = class(TWorker)
  457. private
  458. fCurrentIdTask : Integer;
  459. fNumWorker : Integer;
  460. fTaskQueue : TTaskQueue;
  461. public
  462. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  463. property NumWorker : Integer read fNumWorker;
  464. procedure Execute; override;
  465. end;
  466. TScheduledWorker = class(TWorker)
  467. private
  468. procedure ExpireTask;
  469. public
  470. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  471. procedure Execute; override;
  472. end;
  473. TWorkerPool = TObjectList<TWorker>;
  474. TRunTask = class
  475. public
  476. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  477. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  478. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  479. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  480. end;
  481. TBackgroundTasks = class
  482. private
  483. fMaxQueue : Integer;
  484. fWorkerPool : TWorkerPool;
  485. fConcurrentWorkers : Integer;
  486. fInsertTimeout : Cardinal;
  487. fExtractTimeout : Cardinal;
  488. fTaskQueue : TTaskQueue;
  489. fNumPushedTasks : Int64;
  490. function GetTaskQueue : Cardinal;
  491. public
  492. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  493. destructor Destroy; override;
  494. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  495. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  496. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  497. property TaskQueued : Cardinal read GetTaskQueue;
  498. property NumPushedTasks : Int64 read fNumPushedTasks;
  499. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  500. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  501. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  502. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  503. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  504. procedure Start;
  505. procedure CancelAll;
  506. end;
  507. TScheduledTaskList = TList<IScheduledTask>;
  508. TScheduler = class(TThread)
  509. private
  510. fListLock : TCriticalSection;
  511. fCondVar : TSimpleEvent;
  512. fTaskList : TScheduledTaskList;
  513. fRemoveTaskAfterExpiration : Boolean;
  514. public
  515. constructor Create(aTaskList : TScheduledTaskList);
  516. destructor Destroy; override;
  517. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  518. procedure Execute; override;
  519. function Add(aTask : TScheduledTask) : Integer;
  520. function Get(aIdTask : Int64) : IScheduledTask; overload;
  521. function Get(const aTaskName : string) : IScheduledTask; overload;
  522. end;
  523. TScheduledTasks = class
  524. private
  525. fTaskList : TScheduledTaskList;
  526. fScheduler : TScheduler;
  527. fNumPushedTasks : Int64;
  528. fRemoveTaskAfterExpiration : Boolean;
  529. fIsStarted : Boolean;
  530. fFaultPolicy : TFaultPolicy;
  531. public
  532. constructor Create;
  533. destructor Destroy; override;
  534. property NumPushedTasks : Int64 read fNumPushedTasks;
  535. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  536. property IsStarted : Boolean read fIsStarted;
  537. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  538. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  539. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  540. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  541. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  542. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  543. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  544. procedure Start;
  545. procedure Stop;
  546. end;
  547. TBackgroundWorkers = class
  548. private
  549. fWorkerPool : TWorkerPool;
  550. fConcurrentWorkers : Integer;
  551. fWorkerInitProc : TTaskProc;
  552. fWorkerExecuteProc : TTaskProc;
  553. fWorkerRetryProc : TTaskRetryProc;
  554. fWorkerExceptionProc : TTaskExceptionProc;
  555. fWorkerTerminateProc : TTaskProc;
  556. fMaxRetries : Integer;
  557. fFaultPolicy : TFaultPolicy;
  558. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  559. public
  560. constructor Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  561. destructor Destroy; override;
  562. property ConcurrentWorkers : Integer read fConcurrentWorkers;
  563. function OnInitialize(aTaskProc : TTaskProc) : TBackgroundWorkers;
  564. function OnException(aTaskProc : TTaskExceptionProc) : TBackgroundWorkers;
  565. function OnRetry(aTaskProc : TTaskRetryProc) : TBackgroundWorkers;
  566. function OnTerminated(aTaskProc : TTaskProc) : TBackgroundWorkers;
  567. function Retry(aMaxRetries : Integer) : TBackgroundWorkers;
  568. function RetryForever : TBackgroundWorkers;
  569. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  570. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : TBackgroundWorkers; overload;
  571. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  572. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  573. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  574. procedure Start;
  575. procedure Stop;
  576. end;
  577. implementation
  578. { TThreadedQueueCS<T> }
  579. procedure TThreadedQueueCS<T>.Clear;
  580. var
  581. obj : T;
  582. begin
  583. FQueueLock.Enter;
  584. try
  585. for obj in FQueue do
  586. begin
  587. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  588. end;
  589. SetLength(FQueue,0);
  590. finally
  591. FQueueLock.Leave;
  592. end;
  593. {$IFDEF FPC}
  594. FQueueCondVar.SetEvent;
  595. {$ELSE}
  596. FQueueCondVar.ReleaseAll;
  597. {$ENDIF}
  598. end;
  599. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  600. begin
  601. inherited Create;
  602. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  603. SetLength(FQueue, AQueueDepth);
  604. FQueueLock := TCriticalSection.Create;
  605. {$IFDEF FPC}
  606. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  607. {$ELSE}
  608. FQueueCondVar := TConditionVariableCS.Create;
  609. {$ENDIF}
  610. FPushTimeout := PushTimeout;
  611. FPopTimeout := PopTimeout;
  612. end;
  613. destructor TThreadedQueueCS<T>.Destroy;
  614. begin
  615. DoShutDown;
  616. FQueueLock.Free;
  617. FQueueCondVar.Free;
  618. inherited;
  619. end;
  620. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  621. begin
  622. FQueueLock.Enter;
  623. try
  624. SetLength(FQueue, Length(FQueue) + ADelta);
  625. finally
  626. FQueueLock.Leave;
  627. end;
  628. {$IFDEF FPC}
  629. FQueueCondVar.SetEvent;
  630. {$ELSE}
  631. FQueueCondVar.ReleaseAll;
  632. {$ENDIF}
  633. end;
  634. function TThreadedQueueCS<T>.PopItem: T;
  635. var
  636. LQueueSize: Integer;
  637. begin
  638. PopItem(LQueueSize, Result);
  639. end;
  640. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  641. begin
  642. AItem := Default(T);
  643. FQueueLock.Enter;
  644. try
  645. Result := wrSignaled;
  646. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  647. begin
  648. {$IFDEF FPC}
  649. Result := FQueueCondVar.WaitFor(FPopTimeout);
  650. {$ELSE}
  651. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  652. {$ENDIF}
  653. end;
  654. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  655. AItem := FQueue[FQueueOffset];
  656. FQueue[FQueueOffset] := Default(T);
  657. if FQueueSize = Length(FQueue) then
  658. begin
  659. {$IFDEF FPC}
  660. FQueueCondVar.SetEvent;
  661. {$ELSE}
  662. FQueueCondVar.ReleaseAll;
  663. {$ENDIF}
  664. end;
  665. Dec(FQueueSize);
  666. Inc(FQueueOffset);
  667. Inc(FTotalItemsPopped);
  668. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  669. finally
  670. AQueueSize := FQueueSize;
  671. FQueueLock.Leave;
  672. end;
  673. end;
  674. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  675. var
  676. LQueueSize: Integer;
  677. begin
  678. Result := PopItem(LQueueSize, AItem);
  679. end;
  680. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  681. begin
  682. PopItem(AQueueSize, Result);
  683. end;
  684. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  685. var
  686. LQueueSize: Integer;
  687. begin
  688. Result := PushItem(AItem, LQueueSize);
  689. end;
  690. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  691. begin
  692. FQueueLock.Enter;
  693. try
  694. if FQueueSize >= High(FQueue) then
  695. begin
  696. if FQueueSize < 1024 then Grow(FQueueSize)
  697. else Grow(FQueueSize Div 2);
  698. end;
  699. Result := wrSignaled;
  700. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  701. begin
  702. {$IFDEF FPC}
  703. Result := FQueueCondVar.WaitFor(FPushTimeout);
  704. {$ELSE}
  705. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  706. {$ENDIF}
  707. end;
  708. if FShutDown or (Result <> wrSignaled) then Exit;
  709. if FQueueSize = 0 then
  710. begin
  711. {$IFDEF FPC}
  712. FQueueCondVar.SetEvent;
  713. {$ELSE}
  714. FQueueCondVar.ReleaseAll;
  715. {$ENDIF}
  716. end;
  717. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  718. Inc(FQueueSize);
  719. Inc(FTotalItemsPushed);
  720. finally
  721. AQueueSize := FQueueSize;
  722. FQueueLock.Leave;
  723. end;
  724. end;
  725. procedure TThreadedQueueCS<T>.DoShutDown;
  726. begin
  727. FShutDown := True;
  728. {$IFDEF FPC}
  729. FQueueCondVar.SetEvent;
  730. {$ELSE}
  731. FQueueCondVar.ReleaseAll;
  732. {$ENDIF}
  733. end;
  734. { TThreadedQueueList<T> }
  735. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  736. begin
  737. inherited Create;
  738. fQueue := TQueue<T>.Create;
  739. fQueue.Capacity := AQueueDepth;
  740. fQueueSize := 0;
  741. fQueueLock := TCriticalSection.Create;
  742. {$IFDEF FPC}
  743. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  744. {$ELSE}
  745. fQueueCondVar := TConditionVariableCS.Create;
  746. {$ENDIF}
  747. fPushTimeout := PushTimeout;
  748. fPopTimeout := PopTimeout;
  749. end;
  750. destructor TThreadedQueueList<T>.Destroy;
  751. begin
  752. DoShutDown;
  753. fQueueCondVar.Free;
  754. fQueueLock.Free;
  755. //fQueueCondVar.Free;
  756. fQueue.Free;
  757. inherited;
  758. end;
  759. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  760. begin
  761. fQueueLock.Enter;
  762. try
  763. fQueue.Capacity := fQueue.Capacity + ADelta;
  764. finally
  765. fQueueLock.Leave;
  766. end;
  767. {$IFDEF FPC}
  768. FQueueCondVar.SetEvent;
  769. {$ELSE}
  770. FQueueCondVar.ReleaseAll;
  771. {$ENDIF}
  772. end;
  773. function TThreadedQueueList<T>.PopItem: T;
  774. var
  775. LQueueSize: Integer;
  776. begin
  777. PopItem(LQueueSize, Result);
  778. end;
  779. {$IFDEF FPC}
  780. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  781. //var
  782. //crono : TChronometer;
  783. begin
  784. AItem := Default(T);
  785. //crono := TChronometer.Create(False);
  786. try
  787. Result := wrSignaled;
  788. //writeln('popitem');
  789. //crono.Start;
  790. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  791. begin
  792. //crono.Start;
  793. Result := FQueueCondVar.WaitFor(FPopTimeout);
  794. //crono.Stop;
  795. //writeln('in: ' + crono.ElapsedTime);
  796. //if result = twaitresult.wrError then result := twaitresult.wrError;
  797. end;
  798. //crono.Stop;
  799. //writeln('out: ' + crono.ElapsedTime);
  800. fQueueLock.Enter;
  801. try
  802. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  803. AItem := fQueue.Extract;
  804. Dec(FQueueSize);
  805. Inc(fTotalItemsPopped);
  806. finally
  807. fQueueLock.Leave;
  808. end;
  809. finally
  810. AQueueSize := fQueueSize;
  811. end;
  812. end;
  813. {$ELSE}
  814. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  815. begin
  816. AItem := Default(T);
  817. fQueueLock.Enter;
  818. try
  819. Result := wrSignaled;
  820. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  821. begin
  822. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  823. end;
  824. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  825. AItem := fQueue.Extract;
  826. if fQueueSize = fQueue.Count then
  827. begin
  828. FQueueCondVar.ReleaseAll;
  829. end;
  830. Dec(FQueueSize);
  831. Inc(fTotalItemsPopped);
  832. finally
  833. AQueueSize := fQueueSize;
  834. fQueueLock.Leave;
  835. end;
  836. end;
  837. {$ENDIF}
  838. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  839. var
  840. LQueueSize: Integer;
  841. begin
  842. Result := PopItem(LQueueSize, AItem);
  843. end;
  844. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  845. begin
  846. PopItem(AQueueSize, Result);
  847. end;
  848. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  849. var
  850. LQueueSize: Integer;
  851. begin
  852. Result := PushItem(AItem, LQueueSize);
  853. end;
  854. {$IFDEF FPC}
  855. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  856. begin
  857. FQueueLock.Enter;
  858. try
  859. if FQueueSize >= fQueue.Count then Grow(10);
  860. Result := wrSignaled;
  861. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  862. //begin
  863. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  864. //end;
  865. if fShutDown or (Result <> wrSignaled) then Exit;
  866. //if fQueueSize = 0 then
  867. //begin
  868. // FQueueCondVar.SetEvent;
  869. //end;
  870. fQueue.Enqueue(AItem);
  871. Inc(FQueueSize);
  872. Inc(fTotalItemsPushed);
  873. finally
  874. AQueueSize := fQueueSize;
  875. FQueueLock.Leave;
  876. //FQueueCondVar.SetEvent;
  877. end;
  878. end;
  879. {$ELSE}
  880. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  881. begin
  882. FQueueLock.Enter;
  883. try
  884. Result := wrSignaled;
  885. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  886. //begin
  887. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  888. //end;
  889. if fShutDown or (Result <> wrSignaled) then Exit;
  890. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  891. fQueue.Enqueue(AItem);
  892. Inc(FQueueSize);
  893. Inc(fTotalItemsPushed);
  894. finally
  895. AQueueSize := fQueueSize;
  896. FQueueLock.Leave;
  897. end;
  898. end;
  899. {$ENDIF}
  900. procedure TThreadedQueueList<T>.DoShutDown;
  901. begin
  902. fShutDown := True;
  903. {$IFDEF FPC}
  904. FQueueCondVar.SetEvent;
  905. {$ELSE}
  906. FQueueCondVar.ReleaseAll;
  907. {$ENDIF}
  908. end;
  909. {$IFNDEF FPC}
  910. { TThreadObjectList<T> }
  911. procedure TThreadObjectList<T>.Add(const Item: T);
  912. begin
  913. LockList;
  914. try
  915. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  916. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  917. finally
  918. UnlockList;
  919. end;
  920. end;
  921. procedure TThreadObjectList<T>.Clear;
  922. begin
  923. LockList;
  924. try
  925. fList.Clear;
  926. finally
  927. UnlockList;
  928. end;
  929. end;
  930. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  931. begin
  932. inherited Create;
  933. fLock := TObject.Create;
  934. fList := TObjectList<T>.Create;
  935. fDuplicates := dupIgnore;
  936. end;
  937. destructor TThreadObjectList<T>.Destroy;
  938. begin
  939. LockList;
  940. try
  941. fList.Free;
  942. inherited Destroy;
  943. finally
  944. UnlockList;
  945. fLock.Free;
  946. end;
  947. end;
  948. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  949. begin
  950. LockList;
  951. try
  952. Result := fList[aIndex];
  953. finally
  954. UnlockList;
  955. end;
  956. end;
  957. function TThreadObjectList<T>.LockList: TObjectList<T>;
  958. begin
  959. System.TMonitor.Enter(fLock);
  960. Result := fList;
  961. end;
  962. procedure TThreadObjectList<T>.Remove(const Item: T);
  963. begin
  964. RemoveItem(Item, TDirection.FromBeginning);
  965. end;
  966. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  967. begin
  968. LockList;
  969. try
  970. fList.RemoveItem(Item, Direction);
  971. finally
  972. UnlockList;
  973. end;
  974. end;
  975. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  976. begin
  977. LockList;
  978. try
  979. fList[aIndex] := aValue;
  980. finally
  981. UnlockList;
  982. end;
  983. end;
  984. procedure TThreadObjectList<T>.UnlockList;
  985. begin
  986. System.TMonitor.Exit(fLock);
  987. end;
  988. {$ENDIF}
  989. { TAnonymousThread }
  990. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  991. begin
  992. fThread := TAdvThread.Create(aProc,aSynchronize);
  993. end;
  994. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  995. begin
  996. Result := TAnonymousThread.Create(aProc,False);
  997. end;
  998. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  999. begin
  1000. Result := TAnonymousThread.Create(aProc,True);
  1001. end;
  1002. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  1003. begin
  1004. Result := Self;
  1005. fThread.OnException(aProc);
  1006. end;
  1007. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  1008. begin
  1009. Result := Self;
  1010. fThread.OnTerminate(aProc,False);
  1011. end;
  1012. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  1013. begin
  1014. Result := Self;
  1015. fThread.OnTerminate(aProc,True);
  1016. end;
  1017. procedure TAnonymousThread.Start;
  1018. begin
  1019. fThread.Start;
  1020. end;
  1021. { TTask }
  1022. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  1023. var
  1024. i : Integer;
  1025. begin
  1026. fTaskStatus := TWorkTaskStatus.wtsPending;
  1027. fCustomFaultPolicy := False;
  1028. fNumRetries := 0;
  1029. fExecuteWithSync := False;
  1030. fTerminateWithSync := False;
  1031. fExceptionWithSync := False;
  1032. fFaultControl := TFaultControl.Create;
  1033. fFaultControl.OnRetry := DoRetry;
  1034. fOwnedParams := aOwnedParams;
  1035. fParamList := TParamList.Create(True);
  1036. for i := Low(aParamArray) to High(aParamArray) do
  1037. begin
  1038. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1039. {$IFDEF FPC}
  1040. fParamList[i].Value._AddRef;
  1041. {$ENDIF}
  1042. end;
  1043. fExecuteProc := aTaskProc;
  1044. fEnabled := False;
  1045. end;
  1046. destructor TTask.Destroy;
  1047. begin
  1048. fFaultControl.Free;
  1049. //free passed params
  1050. fParamList.Free;
  1051. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1052. inherited;
  1053. end;
  1054. procedure TTask.Disable;
  1055. begin
  1056. fEnabled := False;
  1057. end;
  1058. procedure TTask.DoException(aException : Exception);
  1059. begin
  1060. fTaskStatus := TWorkTaskStatus.wtsException;
  1061. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1062. else raise aException;
  1063. end;
  1064. procedure TTask.DoExecute;
  1065. begin
  1066. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1067. DoInitialize;
  1068. repeat
  1069. try
  1070. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1071. fTaskStatus := TWorkTaskStatus.wtsDone;
  1072. fFaultControl.SuccessExecution;
  1073. except
  1074. on E : Exception do
  1075. begin
  1076. fTaskStatus := TWorkTaskStatus.wtsException;
  1077. {$IFNDEF FPC}
  1078. {$IF DELPHIRX10_UP}
  1079. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1080. {$ELSE}
  1081. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1082. {$ENDIF}
  1083. {$ELSE}
  1084. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1085. {$ENDIF}
  1086. end;
  1087. end;
  1088. until not fFaultControl.NeedToRetry;
  1089. end;
  1090. procedure TTask.DoInitialize;
  1091. begin
  1092. try
  1093. fFaultControl.Reset;
  1094. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1095. except
  1096. on E : Exception do
  1097. begin
  1098. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1099. end;
  1100. end;
  1101. end;
  1102. function TTask.Done: Boolean;
  1103. begin
  1104. Result := not fFaultControl.TaskFailed;
  1105. end;
  1106. function TTask.Failed: Boolean;
  1107. begin
  1108. Result := fFaultControl.TaskFailed;
  1109. end;
  1110. function TTask.CircuitBreaked: Boolean;
  1111. begin
  1112. Result := fFaultControl.CircuitBreaked;
  1113. end;
  1114. function TTask.LastException: Exception;
  1115. begin
  1116. Result := fFaultControl.LastException;
  1117. end;
  1118. function TTask.MaxRetries: Integer;
  1119. begin
  1120. Result := fFaultControl.MaxRetries;
  1121. end;
  1122. function TTask.NumRetries: Integer;
  1123. begin
  1124. Result := fFaultControl.NumRetries;
  1125. end;
  1126. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1127. begin
  1128. vStopRetries := False;
  1129. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1130. end;
  1131. procedure TTask.DoTerminate;
  1132. begin
  1133. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1134. end;
  1135. procedure TTask.Enable;
  1136. begin
  1137. fEnabled := True;
  1138. end;
  1139. function TTask.GetIdTask: Int64;
  1140. begin
  1141. Result := fIdTask;
  1142. end;
  1143. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1144. begin
  1145. {$IFDEF FPC}
  1146. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1147. {$ENDIF}
  1148. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1149. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1150. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1151. end;
  1152. procedure TTask.SetIdTask(Value : Int64);
  1153. begin
  1154. fIdTask := Value;
  1155. end;
  1156. function TTask.GetNumWorker: Integer;
  1157. begin
  1158. Result := fNumWorker;
  1159. end;
  1160. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1161. begin
  1162. Result := fParamList[aIndex].Value;
  1163. end;
  1164. function TTask.GetParam(const aName: string): TFlexValue;
  1165. var
  1166. paramvalue : TParamValue;
  1167. begin
  1168. for paramvalue in fParamList do
  1169. begin
  1170. if CompareText(paramvalue.Name,aName) = 0 then
  1171. begin
  1172. Exit(paramvalue.Value)
  1173. end;
  1174. end;
  1175. //if not exists
  1176. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1177. end;
  1178. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1179. begin
  1180. Result := @fParamList[aIndex].Value;
  1181. end;
  1182. function TTask.GetResult: TFlexValue;
  1183. begin
  1184. Result := fResult;
  1185. end;
  1186. function TTask.IsEnabled: Boolean;
  1187. begin
  1188. Result := fEnabled;
  1189. end;
  1190. procedure TTask.SetNumWorker(Value: Integer);
  1191. begin
  1192. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1193. fNumWorker := Value;
  1194. end;
  1195. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1196. begin
  1197. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1198. fParamList[aIndex].Value := Value;
  1199. end;
  1200. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1201. var
  1202. paramvalue : TParamValue;
  1203. begin
  1204. //check if already exists parameter
  1205. for paramvalue in fParamList do
  1206. begin
  1207. if CompareText(paramvalue.Name,aName) = 0 then
  1208. begin
  1209. paramvalue.Value := Value;
  1210. Exit;
  1211. end;
  1212. end;
  1213. //if not exists, create one
  1214. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1215. end;
  1216. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1217. begin
  1218. SetParam(aName,Value,False);
  1219. end;
  1220. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1221. begin
  1222. fFaultControl.MaxRetries := aMaxRetries;
  1223. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1224. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1225. fCustomFaultPolicy := True;
  1226. end;
  1227. procedure TTask.SetResult(aValue: TFlexValue);
  1228. begin
  1229. fResult := aValue;
  1230. end;
  1231. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1232. begin
  1233. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1234. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1235. fFaultControl.WaitTimeMultiplierFactor := 1;
  1236. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1237. fCustomFaultPolicy := True;
  1238. end;
  1239. function TTask.TaskStatus: TWorkTaskStatus;
  1240. begin
  1241. Result := fTaskStatus;
  1242. end;
  1243. { TWorkTask }
  1244. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1245. begin
  1246. fExceptProc := aTaskProc;
  1247. Result := Self;
  1248. end;
  1249. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1250. begin
  1251. fExceptionWithSync := True;
  1252. Result := OnException(aTaskProc);
  1253. end;
  1254. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1255. begin
  1256. fInitializeProc := aTaskProc;
  1257. Result := Self;
  1258. end;
  1259. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1260. begin
  1261. fRetryProc := aTaskProc;
  1262. Result := Self;
  1263. end;
  1264. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1265. begin
  1266. fTerminateProc := aTaskProc;
  1267. Result := Self;
  1268. end;
  1269. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1270. begin
  1271. fTerminateWithSync := True;
  1272. Result := OnTerminated(aTaskProc);
  1273. end;
  1274. procedure TWorkTask.Run;
  1275. begin
  1276. fEnabled := True;
  1277. end;
  1278. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1279. begin
  1280. Result := Self;
  1281. SetParam(aName,aValue);
  1282. end;
  1283. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1284. begin
  1285. Result := Self;
  1286. SetParam(aName,aValue,aOwned);
  1287. end;
  1288. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1289. begin
  1290. Result := Self;
  1291. SetRetryPolicy(aMaxRetries,0,1);
  1292. end;
  1293. function TWorkTask.RetryForever: IWorkTask;
  1294. begin
  1295. Result := Self;
  1296. SetRetryPolicy(-1,0,1);
  1297. end;
  1298. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1299. begin
  1300. Result := Self;
  1301. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1302. end;
  1303. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1304. begin
  1305. Result := Self;
  1306. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1307. end;
  1308. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1309. begin
  1310. Result := Self;
  1311. SetRetryPolicy(aWaitTimeArray);
  1312. end;
  1313. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1314. begin
  1315. Result := Self;
  1316. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1317. end;
  1318. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1319. begin
  1320. Result := Self;
  1321. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1322. end;
  1323. { TBackgroundTasks }
  1324. procedure TBackgroundTasks.CancelAll;
  1325. begin
  1326. fTaskQueue.Clear;
  1327. end;
  1328. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1329. begin
  1330. fMaxQueue := 0;
  1331. fConcurrentWorkers := aConcurrentWorkers;
  1332. fInsertTimeout := INFINITE;
  1333. fExtractTimeout := 5000;
  1334. fNumPushedTasks := 0;
  1335. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1336. end;
  1337. destructor TBackgroundTasks.Destroy;
  1338. begin
  1339. CancelAll;
  1340. fTaskQueue.DoShutDown;
  1341. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1342. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1343. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1344. inherited;
  1345. end;
  1346. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1347. begin
  1348. Result := fTaskQueue.QueueSize;
  1349. end;
  1350. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1351. begin
  1352. Result := AddTask([],False,aTaskProc);
  1353. end;
  1354. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1355. var
  1356. worktask : IWorkTask;
  1357. begin
  1358. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1359. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1360. Inc(fNumPushedTasks);
  1361. worktask.SetIdTask(fNumPushedTasks);
  1362. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1363. begin
  1364. Result := worktask;
  1365. end;
  1366. end;
  1367. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1368. begin
  1369. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1370. TTask(Result).ExecuteWithSync := True;
  1371. end;
  1372. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1373. begin
  1374. Result := AddTask_Sync([],False,aTaskProc);
  1375. end;
  1376. procedure TBackgroundTasks.Start;
  1377. var
  1378. i : Integer;
  1379. worker : TWorker;
  1380. begin
  1381. //create workers
  1382. if fWorkerPool <> nil then fWorkerPool.Free;
  1383. fWorkerPool := TWorkerPool.Create(True);
  1384. for i := 1 to fConcurrentWorkers do
  1385. begin
  1386. worker := TQueueWorker.Create(i,fTaskQueue);
  1387. fWorkerPool.Add(worker);
  1388. worker.Start;
  1389. end;
  1390. end;
  1391. { TWorker }
  1392. constructor TWorker.Create;
  1393. begin
  1394. inherited Create(True);
  1395. fDefaultFaultPolicy := TFaultPolicy.Create;
  1396. fStatus := TWorkerStatus.wsSuspended;
  1397. FreeOnTerminate := False;
  1398. end;
  1399. destructor TWorker.Destroy;
  1400. begin
  1401. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1402. inherited;
  1403. end;
  1404. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1405. begin
  1406. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1407. end;
  1408. procedure TWorker.Execute;
  1409. begin
  1410. end;
  1411. procedure TWorker.ExecuteTask;
  1412. begin
  1413. fCurrentTask.DoExecute;
  1414. end;
  1415. procedure TWorker.TerminateTask;
  1416. begin
  1417. fCurrentTask.DoTerminate;
  1418. end;
  1419. { TSimpleWorker }
  1420. constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
  1421. begin
  1422. inherited Create;
  1423. fRunOnce := aRunOnce;
  1424. fCurrentTask := aTask;
  1425. FreeOnTerminate := True;
  1426. end;
  1427. procedure TSimpleWorker.Execute;
  1428. begin
  1429. fStatus := TWorkerStatus.wsIdle;
  1430. while not Terminated do
  1431. begin
  1432. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1433. try
  1434. fStatus := TWorkerStatus.wsWorking;
  1435. try
  1436. SetFaultPolicy(TTask(fCurrentTask));
  1437. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1438. else fCurrentTask.DoExecute;
  1439. except
  1440. on E : Exception do
  1441. begin
  1442. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1443. else raise ETaskExecutionError.Create(e.Message);
  1444. end;
  1445. end;
  1446. finally
  1447. fStatus := TWorkerStatus.wsIdle;
  1448. try
  1449. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1450. else fCurrentTask.DoTerminate;
  1451. except
  1452. on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1453. end;
  1454. if fRunOnce then Terminate;
  1455. end;
  1456. end;
  1457. fStatus := TWorkerStatus.wsSuspended
  1458. end;
  1459. { TQueueWorker }
  1460. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1461. begin
  1462. inherited Create;
  1463. fNumWorker := aNumWorker;
  1464. fTaskQueue := aTaskQueue;
  1465. end;
  1466. procedure TQueueWorker.Execute;
  1467. begin
  1468. fStatus := TWorkerStatus.wsIdle;
  1469. while not Terminated do
  1470. begin
  1471. fCurrentTask := fTaskQueue.PopItem;
  1472. if fCurrentTask <> nil then
  1473. try
  1474. fStatus := TWorkerStatus.wsWorking;
  1475. try
  1476. fCurrentIdTask := fCurrentTask.GetIdTask;
  1477. SetFaultPolicy(TTask(fCurrentTask));
  1478. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1479. else fCurrentTask.DoExecute;
  1480. except
  1481. on E : Exception do
  1482. begin
  1483. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1484. else raise ETaskExecutionError.Create(e.Message);
  1485. end;
  1486. end;
  1487. finally
  1488. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1489. else fCurrentTask.DoTerminate;
  1490. fStatus := TWorkerStatus.wsIdle;
  1491. end;
  1492. end;
  1493. fStatus := TWorkerStatus.wsSuspended
  1494. end;
  1495. { TScheduledWorker }
  1496. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1497. begin
  1498. inherited Create;
  1499. {$IFNDEF DELPHILINUX}
  1500. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1501. {$ENDIF}
  1502. FreeOnTerminate := True;
  1503. fCurrentTask := aScheduledTask;
  1504. end;
  1505. procedure TScheduledWorker.Execute;
  1506. begin
  1507. fStatus := TWorkerStatus.wsIdle;
  1508. if Assigned(fCurrentTask) then
  1509. begin
  1510. try
  1511. fStatus := TWorkerStatus.wsWorking;
  1512. try
  1513. SetFaultPolicy(TTask(fCurrentTask));
  1514. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1515. else fCurrentTask.DoExecute;
  1516. fStatus := TWorkerStatus.wsIdle;
  1517. except
  1518. on E : Exception do
  1519. begin
  1520. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1521. else raise ETaskExecutionError.Create(e.Message);
  1522. end;
  1523. end;
  1524. finally
  1525. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1526. else fCurrentTask.DoTerminate;
  1527. //check if expired
  1528. if (fCurrentTask as IScheduledTask).IsFinished then
  1529. begin
  1530. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1531. else (fCurrentTask as IScheduledTask).DoExpire;
  1532. end;
  1533. end;
  1534. end;
  1535. fCurrentTask := nil;
  1536. fStatus := TWorkerStatus.wsSuspended;
  1537. end;
  1538. procedure TScheduledWorker.ExpireTask;
  1539. begin
  1540. (fCurrentTask as IScheduledTask).DoExpire;
  1541. end;
  1542. { TScheduledTasks }
  1543. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1544. begin
  1545. Result := AddTask(aTaskName,[],False,aTaskProc);
  1546. end;
  1547. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1548. var
  1549. scheduletask : TScheduledTask;
  1550. begin
  1551. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1552. scheduletask.Name := aTaskName;
  1553. Inc(fNumPushedTasks);
  1554. scheduletask.SetIdTask(fNumPushedTasks);
  1555. fTaskList.Add(scheduletask);
  1556. Result := scheduletask;
  1557. end;
  1558. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1559. begin
  1560. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1561. TTask(Result).ExecuteWithSync := True;
  1562. end;
  1563. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1564. begin
  1565. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1566. end;
  1567. constructor TScheduledTasks.Create;
  1568. begin
  1569. fNumPushedTasks := 0;
  1570. fIsStarted := False;
  1571. fFaultPolicy := TFaultPolicy.Create;
  1572. fTaskList := TScheduledTaskList.Create;
  1573. end;
  1574. destructor TScheduledTasks.Destroy;
  1575. begin
  1576. if Assigned(fScheduler) then
  1577. begin
  1578. fScheduler.Terminate;
  1579. fScheduler.WaitFor;
  1580. fScheduler.Free;
  1581. end;
  1582. if Assigned(fTaskList) then fTaskList.Free;
  1583. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1584. inherited;
  1585. end;
  1586. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1587. begin
  1588. Result := fScheduler.Get(aIdTask);
  1589. end;
  1590. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1591. begin
  1592. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1593. Result := fScheduler.Get(aTaskName);
  1594. end;
  1595. procedure TScheduledTasks.Start;
  1596. begin
  1597. if fIsStarted then Exit;
  1598. if not Assigned(fScheduler) then
  1599. begin
  1600. fScheduler := TScheduler.Create(fTaskList);
  1601. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1602. end;
  1603. fScheduler.Start;
  1604. fIsStarted := True;
  1605. end;
  1606. procedure TScheduledTasks.Stop;
  1607. begin
  1608. if Assigned(fScheduler) then fScheduler.Terminate;
  1609. fIsStarted := False;
  1610. end;
  1611. { TScheduledTask }
  1612. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1613. begin
  1614. Result := Self;
  1615. SetParam(aName,aValue);
  1616. end;
  1617. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1618. begin
  1619. Result := Self;
  1620. SetParam(aName,aValue);
  1621. end;
  1622. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1623. begin
  1624. Result := Self;
  1625. ClearSchedule;
  1626. fScheduleMode := TScheduleMode.smRunOnce;
  1627. fStartDate := aStartDate;
  1628. fNextExecution := aStartDate;
  1629. end;
  1630. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1631. begin
  1632. Result := Self;
  1633. ClearSchedule;
  1634. fScheduleMode := TScheduleMode.smRunOnce;
  1635. fStartDate := IncMinute(Now(),aMinutes);
  1636. fNextExecution := fStartDate;
  1637. end;
  1638. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1639. begin
  1640. Result := Self;
  1641. ClearSchedule;
  1642. fScheduleMode := TScheduleMode.smRunOnce;
  1643. fStartDate := IncSecond(Now(),aSeconds);
  1644. fNextExecution := fStartDate;
  1645. end;
  1646. function TScheduledTask.StartNow: IScheduledTask;
  1647. begin
  1648. Result := Self;
  1649. ClearSchedule;
  1650. fScheduleMode := TScheduleMode.smRunOnce;
  1651. fStartDate := Now();
  1652. fNextExecution := fStartDate;
  1653. end;
  1654. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1655. begin
  1656. Result := Self;
  1657. ClearSchedule;
  1658. fScheduleMode := TScheduleMode.smRunOnce;
  1659. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1660. fNextExecution := fStartDate;
  1661. end;
  1662. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1663. begin
  1664. Result := Self;
  1665. ClearSchedule;
  1666. fScheduleMode := TScheduleMode.smRunOnce;
  1667. fStartDate := ChangeTimeOfADay(Now(),aHour,aMinute,aSecond);
  1668. fNextExecution := fStartDate;
  1669. end;
  1670. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1671. begin
  1672. Result := Self;
  1673. ClearSchedule;
  1674. fScheduleMode := TScheduleMode.smRunOnce;
  1675. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1676. fNextExecution := fStartDate;
  1677. end;
  1678. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1679. begin
  1680. Result := Self;
  1681. SetRetryPolicy(aMaxRetries,0,1);
  1682. end;
  1683. function TScheduledTask.RetryForever: IScheduledTask;
  1684. begin
  1685. Result := Self;
  1686. SetRetryPolicy(-1,0,1);
  1687. end;
  1688. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1689. begin
  1690. Result := Self;
  1691. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1692. end;
  1693. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1694. begin
  1695. Result := Self;
  1696. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1697. end;
  1698. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1699. begin
  1700. Result := Self;
  1701. SetRetryPolicy(aWaitTimeArray);
  1702. end;
  1703. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1704. begin
  1705. Result := Self;
  1706. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1707. end;
  1708. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1709. begin
  1710. Result := Self;
  1711. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1712. end;
  1713. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1714. begin
  1715. if fStartDate = 0.0 then ClearSchedule;
  1716. fScheduleMode := TScheduleMode.smRepeatMode;
  1717. fTimeMeasure := aTimeMeasure;
  1718. fTimeInterval := aInterval;
  1719. if fStartDate < Now() then fStartDate := Now();
  1720. fNextExecution := fStartDate;
  1721. fEnabled := True;
  1722. end;
  1723. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1724. begin
  1725. if fStartDate = 0.0 then ClearSchedule;
  1726. fScheduleMode := TScheduleMode.smRepeatMode;
  1727. fTimeMeasure := aTimeMeasure;
  1728. fTimeInterval := aInterval;
  1729. if fStartDate < Now() then fStartDate := Now();
  1730. fExpirationDate := aEndTime;
  1731. fNextExecution := fStartDate;
  1732. fEnabled := True;
  1733. end;
  1734. procedure TScheduledTask.RepeatEveryDay;
  1735. begin
  1736. RepeatEvery(1,tmDays);
  1737. end;
  1738. procedure TScheduledTask.RepeatEveryWeek;
  1739. begin
  1740. RepeatEvery(7,tmDays);
  1741. end;
  1742. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1743. begin
  1744. if fStartDate = 0.0 then ClearSchedule;
  1745. fScheduleMode := TScheduleMode.smRepeatMode;
  1746. fTimeMeasure := aTimeMeasure;
  1747. fTimeInterval := aInterval;
  1748. if fStartDate < Now() then fStartDate := Now();
  1749. fExpirationTimes := aRepeatTimes;
  1750. fNextExecution := fStartDate;
  1751. fEnabled := True;
  1752. end;
  1753. procedure TScheduledTask.RunOnce;
  1754. begin
  1755. fScheduleMode := TScheduleMode.smRunOnce;
  1756. if fStartDate < Now() then fStartDate := Now();
  1757. fNextExecution := fStartDate;
  1758. fEnabled := True;
  1759. end;
  1760. procedure TScheduledTask.Cancel;
  1761. begin
  1762. fFinished := True;
  1763. end;
  1764. function TScheduledTask.CheckSchedule: Boolean;
  1765. begin
  1766. Result := False;
  1767. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1768. if fScheduleMode = TScheduleMode.smRunOnce then
  1769. begin
  1770. //if start date reached
  1771. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1772. begin
  1773. fLastExecution := Now();
  1774. Inc(fExecutionTimes);
  1775. fFinished := True;
  1776. Result := True;
  1777. end;
  1778. end
  1779. else
  1780. begin
  1781. //if next execution reached
  1782. if Now() >= fNextExecution then
  1783. begin
  1784. //check expiration limits
  1785. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1786. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1787. begin
  1788. fFinished := True;
  1789. Exit;
  1790. end;
  1791. //calculate next execution
  1792. case fTimeMeasure of
  1793. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1794. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1795. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1796. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1797. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1798. end;
  1799. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1800. else
  1801. begin
  1802. fLastExecution := Now();
  1803. Inc(fExecutionTimes);
  1804. Result := True;
  1805. end;
  1806. end;
  1807. end;
  1808. end;
  1809. procedure TScheduledTask.ClearSchedule;
  1810. begin
  1811. inherited;
  1812. fFinished := False;
  1813. fStartDate := 0.0;
  1814. fLastExecution := 0.0;
  1815. fNextExecution := 0.0;
  1816. fExpirationDate := 0.0;
  1817. fScheduleMode := TScheduleMode.smRunOnce;
  1818. fTimeMeasure := TTimeMeasure.tmSeconds;
  1819. fTimeInterval := 0;
  1820. end;
  1821. procedure TScheduledTask.DoExpire;
  1822. begin
  1823. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1824. fEnabled := False;
  1825. end;
  1826. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1827. begin
  1828. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1829. end;
  1830. function TScheduledTask.GetTaskName: string;
  1831. begin
  1832. Result := fName;
  1833. end;
  1834. function TScheduledTask.IsFinished: Boolean;
  1835. begin
  1836. Result := fFinished;
  1837. end;
  1838. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1839. begin
  1840. fExceptProc := aTaskProc;
  1841. Result := Self;
  1842. end;
  1843. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1844. begin
  1845. Result := OnException(aTaskProc);
  1846. TTask(Result).ExceptionWithSync := True;
  1847. end;
  1848. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1849. begin
  1850. fRetryProc := aTaskProc;
  1851. Result := Self;
  1852. end;
  1853. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1854. begin
  1855. fExpiredProc := aTaskProc;
  1856. Result := Self;
  1857. end;
  1858. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1859. begin
  1860. Result := OnExpired(aTaskProc);
  1861. TScheduledTask(Result).ExpireWithSync := True;
  1862. end;
  1863. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1864. begin
  1865. fInitializeProc := aTaskProc;
  1866. Result := Self;
  1867. end;
  1868. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1869. begin
  1870. fTerminateProc := aTaskProc;
  1871. Result := Self;
  1872. end;
  1873. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1874. begin
  1875. Result := OnTerminated(aTaskProc);
  1876. TTask(Result).TerminateWithSync := True;
  1877. end;
  1878. { TScheduler }
  1879. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1880. begin
  1881. inherited Create(True);
  1882. FreeOnTerminate := False;
  1883. fListLock := TCriticalSection.Create;
  1884. fRemoveTaskAfterExpiration := False;
  1885. fTaskList := aTaskList;
  1886. {$IFDEF FPC}
  1887. fCondVar := TSimpleEvent.Create;
  1888. {$ELSE}
  1889. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1890. {$ENDIF}
  1891. end;
  1892. destructor TScheduler.Destroy;
  1893. begin
  1894. fCondVar.SetEvent;
  1895. fCondVar.Free;
  1896. fTaskList := nil;
  1897. fListLock.Free;
  1898. inherited;
  1899. end;
  1900. procedure TScheduler.Execute;
  1901. var
  1902. task : IScheduledTask;
  1903. worker : TScheduledWorker;
  1904. numworker : Int64;
  1905. begin
  1906. numworker := 0;
  1907. while not Terminated do
  1908. begin
  1909. fListLock.Enter;
  1910. try
  1911. for task in fTaskList do
  1912. begin
  1913. if (task.IsEnabled) and (not task.IsFinished) then
  1914. begin
  1915. if task.CheckSchedule then
  1916. begin
  1917. Inc(numworker);
  1918. worker := TScheduledWorker.Create(numworker,task);
  1919. worker.Start;
  1920. end;
  1921. end
  1922. else
  1923. begin
  1924. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1925. end;
  1926. end;
  1927. task := nil;
  1928. finally
  1929. fListLock.Leave;
  1930. end;
  1931. fCondVar.WaitFor(250);
  1932. end;
  1933. end;
  1934. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1935. begin
  1936. Result := fTaskList.Add(aTask);
  1937. end;
  1938. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1939. var
  1940. task : IScheduledTask;
  1941. begin
  1942. fListLock.Enter;
  1943. try
  1944. for task in fTaskList do
  1945. begin
  1946. if task.IdTask = aIdTask then Exit(task);
  1947. end;
  1948. finally
  1949. fListLock.Leave;
  1950. end;
  1951. end;
  1952. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1953. var
  1954. task : IScheduledTask;
  1955. begin
  1956. fListLock.Enter;
  1957. try
  1958. for task in fTaskList do
  1959. begin
  1960. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1961. end;
  1962. finally
  1963. fListLock.Leave;
  1964. end;
  1965. end;
  1966. { TAdvThread }
  1967. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1968. begin
  1969. inherited Create(True);
  1970. FreeOnTerminate := True;
  1971. fExecuteWithSync := aSynchronize;
  1972. fExecuteProc := aProc;
  1973. end;
  1974. procedure TAdvThread.DoExecute;
  1975. begin
  1976. try
  1977. if Assigned(fExecuteProc) then fExecuteProc;
  1978. except
  1979. on E : Exception do
  1980. begin
  1981. {$IFNDEF FPC}
  1982. {$IF DELPHIRX10_UP}
  1983. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1984. {$ELSE}
  1985. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1986. {$ENDIF}
  1987. {$ELSE}
  1988. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1989. {$ENDIF}
  1990. else raise e;
  1991. end;
  1992. end;
  1993. end;
  1994. procedure TAdvThread.CallToTerminate;
  1995. begin
  1996. if Assigned(fTerminateProc) then fTerminateProc;
  1997. end;
  1998. procedure TAdvThread.DoTerminate;
  1999. begin
  2000. if fTerminateWithSync then Synchronize(CallToTerminate)
  2001. else CallToTerminate;
  2002. end;
  2003. procedure TAdvThread.Execute;
  2004. begin
  2005. if fExecuteWithSync then Synchronize(DoExecute)
  2006. else DoExecute;
  2007. end;
  2008. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  2009. begin
  2010. fExceptionProc := aProc;
  2011. end;
  2012. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  2013. begin
  2014. fTerminateWithSync := aSynchronize;
  2015. fTerminateProc := aProc;
  2016. end;
  2017. { TRunTask }
  2018. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  2019. begin
  2020. Result := Execute([],False,aTaskProc);
  2021. end;
  2022. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  2023. begin
  2024. Result := Execute_Sync([],False,aTaskProc);
  2025. end;
  2026. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2027. var
  2028. task : TWorkTask;
  2029. worker : TSimpleWorker;
  2030. begin
  2031. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2032. task.ExecuteWithSync := False;
  2033. Result := task;
  2034. worker := TSimpleWorker.Create(task);
  2035. worker.Start;
  2036. end;
  2037. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2038. var
  2039. task : TWorkTask;
  2040. worker : TSimpleWorker;
  2041. begin
  2042. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2043. task.ExecuteWithSync := True;
  2044. Result := task;
  2045. worker := TSimpleWorker.Create(task);
  2046. worker.Start;
  2047. end;
  2048. { TParamValue }
  2049. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2050. begin
  2051. inherited Create;
  2052. fName := aName;
  2053. fValue := aValue;
  2054. fOwned := aOwnedValue;
  2055. end;
  2056. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2057. begin
  2058. inherited Create;
  2059. fName := aName;
  2060. fValue := aValue;
  2061. fOwned := aOwnedValue;
  2062. end;
  2063. constructor TParamValue.Create;
  2064. begin
  2065. fName := '';
  2066. fOwned := False;
  2067. end;
  2068. destructor TParamValue.Destroy;
  2069. begin
  2070. {$IFDEF FPC}
  2071. fValue._Release;
  2072. {$ENDIF}
  2073. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2074. inherited;
  2075. end;
  2076. { TBackgroundWorkers }
  2077. constructor TBackgroundWorkers.Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  2078. begin
  2079. fConcurrentWorkers := aConcurrentWorkers;
  2080. fWorkerExecuteProc := aWorkerProc;
  2081. fWorkerPool := TWorkerPool.Create(True);
  2082. end;
  2083. destructor TBackgroundWorkers.Destroy;
  2084. begin
  2085. fWorkerPool.Free;
  2086. inherited;
  2087. end;
  2088. procedure TBackgroundWorkers.Start;
  2089. var
  2090. i : Integer;
  2091. worker : TWorker;
  2092. task : IWorkTask;
  2093. begin
  2094. for i := 1 to fConcurrentWorkers do
  2095. begin
  2096. task := TWorkTask.Create([],False,fWorkerExecuteProc)
  2097. .OnInitialize(fWorkerInitProc)
  2098. .OnRetry(fWorkerRetryProc)
  2099. .OnException(fWorkerExceptionProc)
  2100. .OnTerminated(fWorkerTerminateProc);
  2101. task.NumWorker := i;
  2102. task.Run;
  2103. worker := TSimpleWorker.Create(task,False);
  2104. fWorkerPool.Add(worker);
  2105. worker.Start;
  2106. end;
  2107. end;
  2108. procedure TBackgroundWorkers.Stop;
  2109. var
  2110. worker : TWorker;
  2111. begin
  2112. for worker in fWorkerPool do
  2113. begin
  2114. worker.Terminate;
  2115. worker.WaitFor;
  2116. fWorkerPool.Remove(worker);
  2117. end;
  2118. end;
  2119. function TBackgroundWorkers.OnException(aTaskProc: TTaskExceptionProc): TBackgroundWorkers;
  2120. begin
  2121. Result := Self;
  2122. fWorkerExceptionProc := aTaskProc;
  2123. end;
  2124. function TBackgroundWorkers.OnInitialize(aTaskProc: TTaskProc): TBackgroundWorkers;
  2125. begin
  2126. Result := Self;
  2127. fWorkerInitProc := aTaskProc;
  2128. end;
  2129. function TBackgroundWorkers.OnRetry(aTaskProc: TTaskRetryProc): TBackgroundWorkers;
  2130. begin
  2131. Result := Self;
  2132. fWorkerRetryProc := aTaskProc;
  2133. end;
  2134. function TBackgroundWorkers.OnTerminated(aTaskProc: TTaskProc): TBackgroundWorkers;
  2135. begin
  2136. Result := Self;
  2137. fWorkerTerminateProc := aTaskProc;
  2138. end;
  2139. function TBackgroundWorkers.Retry(aMaxRetries: Integer): TBackgroundWorkers;
  2140. begin
  2141. Result := Self;
  2142. SetRetryPolicy(aMaxRetries,0,1);
  2143. end;
  2144. function TBackgroundWorkers.RetryForever: TBackgroundWorkers;
  2145. begin
  2146. Result := Self;
  2147. SetRetryPolicy(-1,0,1);
  2148. end;
  2149. procedure TBackgroundWorkers.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double);
  2150. begin
  2151. fFaultPolicy.MaxRetries := aMaxRetries;
  2152. fFaultPolicy.WaitTimeBetweenRetries := aWaitTimeBetweenRetriesMS;
  2153. fFaultPolicy.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  2154. end;
  2155. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2156. begin
  2157. end;
  2158. function TBackgroundWorkers.WaitAndRetry(aWaitTimeArray: TArray<Integer>): TBackgroundWorkers;
  2159. begin
  2160. end;
  2161. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2162. begin
  2163. end;
  2164. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2165. begin
  2166. end;
  2167. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2168. begin
  2169. end;
  2170. end.