Quick.Threads.pas 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310
  1. { ***************************************************************************
  2. Copyright (c) 2016-2020 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 27/06/2020
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. procedure Enable;
  209. procedure Disable;
  210. {$IFNDEF FPC}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  212. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ELSE}
  214. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  215. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  216. {$ENDIF}
  217. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  218. property Result : TFlexValue read GetResult write SetResult;
  219. property IdTask : Int64 read GetIdTask;
  220. function Done : Boolean;
  221. function Failed : Boolean;
  222. function NumRetries : Integer;
  223. function MaxRetries : Integer;
  224. function LastException : Exception;
  225. function CircuitBreaked : Boolean;
  226. function IsEnabled : Boolean;
  227. end;
  228. {$IFNDEF FPC}
  229. TTaskProc = reference to procedure(task : ITask);
  230. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  231. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  232. {$ELSE}
  233. TTaskProc = procedure(task : ITask) of object;
  234. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  235. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  236. {$ENDIF}
  237. IWorkTask = interface(ITask)
  238. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  239. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  240. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  241. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  242. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  243. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  244. function Retry(aMaxRetries : Integer) : IWorkTask;
  245. function RetryForever : IWorkTask;
  246. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  247. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  248. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  249. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  250. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  251. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  252. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  253. procedure Run;
  254. end;
  255. IScheduledTask = interface(ITask)
  256. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  257. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  259. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  260. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  261. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  262. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  263. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  264. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  265. function Retry(aMaxRetries : Integer) : IScheduledTask;
  266. function RetryForever : IScheduledTask;
  267. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  268. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  269. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  270. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  271. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  272. function CheckSchedule : Boolean;
  273. procedure DoExpire;
  274. function GetTaskName : string;
  275. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  276. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  277. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  278. function StartOnDayChange : IScheduledTask;
  279. function StartNow : IScheduledTask;
  280. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  281. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  282. procedure RunOnce;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  284. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  285. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  286. procedure RepeatEveryDay;
  287. procedure RepeatEveryWeek;
  288. function IsFinished : Boolean;
  289. procedure Cancel;
  290. property Name : string read GetTaskName;
  291. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  292. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  293. end;
  294. TTask = class(TInterfacedObject,ITask)
  295. private
  296. fIdTask : Int64;
  297. fNumWorker : Integer;
  298. fNumRetries : Integer;
  299. fParamList : TParamList;
  300. fInitializeProc : TTaskProc;
  301. fExecuteProc : TTaskProc;
  302. fExceptProc : TTaskExceptionProc;
  303. fTerminateProc : TTaskProc;
  304. fExpiredProc : TTaskProc;
  305. fTaskStatus : TWorkTaskStatus;
  306. fOwnedParams : Boolean;
  307. fEnabled : Boolean;
  308. fExecuteWithSync : Boolean;
  309. fExceptionWithSync : Boolean;
  310. fRetryProc : TTaskRetryProc;
  311. fTerminateWithSync : Boolean;
  312. fFaultControl : TFaultControl;
  313. fCustomFaultPolicy : Boolean;
  314. fResult : TFlexValue;
  315. function GetParam(aIndex : Integer) : TFlexValue; overload;
  316. function GetParam(const aName : string) : TFlexValue; overload;
  317. function GetParam2(aIndex : Integer) : PFlexValue;
  318. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  319. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  320. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  321. procedure DoInitialize;
  322. procedure DoExecute;
  323. procedure DoException(aException : Exception);
  324. procedure DoTerminate;
  325. function GetNumWorker : Integer;
  326. procedure SetNumWorker(Value : Integer);
  327. function GetIdTask : Int64;
  328. procedure SetIdTask(Value : Int64);
  329. function GetResult : TFlexValue;
  330. procedure SetResult(aValue : TFlexValue);
  331. protected
  332. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  333. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  334. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  335. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  336. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  337. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  338. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  339. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  340. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  341. public
  342. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  343. destructor Destroy; override;
  344. property IdTask : Int64 read GetIdTask;
  345. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  346. function IsEnabled : Boolean;
  347. function TaskStatus : TWorkTaskStatus;
  348. function Done : Boolean;
  349. function Failed : Boolean;
  350. function NumRetries : Integer;
  351. function MaxRetries : Integer;
  352. function LastException : Exception;
  353. function CircuitBreaked : Boolean;
  354. procedure Disable;
  355. procedure Enable;
  356. end;
  357. TWorkTask = class(TTask,IWorkTask)
  358. public
  359. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  360. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  361. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  362. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  363. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  364. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  365. function Retry(aMaxRetries : Integer) : IWorkTask;
  366. function RetryForever : IWorkTask;
  367. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  368. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  369. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  370. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  371. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  372. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  373. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  374. procedure Run; virtual;
  375. end;
  376. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  377. TScheduledTask = class(TTask,IScheduledTask)
  378. private
  379. fName : string;
  380. fExecutionTimes : Integer;
  381. fScheduleMode : TScheduleMode;
  382. fTimeInterval : Integer;
  383. fTimeMeasure : TTimeMeasure;
  384. fStartDate : TDateTime;
  385. fLastExecution : TDateTime;
  386. fNextExecution : TDateTime;
  387. fExpirationDate : TDateTime;
  388. fExpirationTimes : Integer;
  389. fFinished : Boolean;
  390. fExpireWithSync: Boolean;
  391. procedure ClearSchedule;
  392. function CheckSchedule : Boolean;
  393. procedure DoExpire;
  394. function GetTaskName : string;
  395. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  396. protected
  397. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  398. public
  399. property Name : string read fName write fName;
  400. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  401. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  402. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  403. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  404. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  405. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  406. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  407. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  408. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  409. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  410. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  411. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  412. function StartOnDayChange : IScheduledTask;
  413. function StartNow : IScheduledTask;
  414. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  415. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  416. procedure RunOnce;
  417. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  418. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  419. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  420. procedure RepeatEveryDay;
  421. procedure RepeatEveryWeek;
  422. function Retry(aMaxRetries : Integer) : IScheduledTask;
  423. function RetryForever : IScheduledTask;
  424. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  425. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  426. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  427. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  428. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  429. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  430. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  431. function IsFinished : Boolean;
  432. procedure Cancel;
  433. end;
  434. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  435. TWorker = class(TThread)
  436. protected
  437. fStatus : TWorkerStatus;
  438. fCurrentTask : ITask;
  439. fDefaultFaultPolicy : TFaultPolicy;
  440. procedure ExecuteTask;
  441. procedure TerminateTask;
  442. public
  443. constructor Create;
  444. destructor Destroy; override;
  445. property Status : TWorkerStatus read fStatus;
  446. procedure SetFaultPolicy(aTask : TTask);
  447. procedure Execute; override;
  448. end;
  449. TSimpleWorker = class(TWorker)
  450. public
  451. constructor Create(aTask : ITask);
  452. procedure Execute; override;
  453. end;
  454. TQueueWorker = class(TWorker)
  455. private
  456. fCurrentIdTask : Integer;
  457. fNumWorker : Integer;
  458. fTaskQueue : TTaskQueue;
  459. public
  460. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  461. property NumWorker : Integer read fNumWorker;
  462. procedure Execute; override;
  463. end;
  464. TScheduledWorker = class(TWorker)
  465. private
  466. procedure ExpireTask;
  467. public
  468. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  469. procedure Execute; override;
  470. end;
  471. TWorkerPool = TObjectList<TWorker>;
  472. TRunTask = class
  473. public
  474. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  475. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  476. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  477. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  478. end;
  479. TBackgroundTasks = class
  480. private
  481. fMaxQueue : Integer;
  482. fWorkerPool : TWorkerPool;
  483. fConcurrentWorkers : Integer;
  484. fInsertTimeout : Cardinal;
  485. fExtractTimeout : Cardinal;
  486. fTaskQueue : TTaskQueue;
  487. fNumPushedTasks : Int64;
  488. function GetTaskQueue : Cardinal;
  489. public
  490. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  491. destructor Destroy; override;
  492. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  493. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  494. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  495. property TaskQueued : Cardinal read GetTaskQueue;
  496. property NumPushedTasks : Int64 read fNumPushedTasks;
  497. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  498. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  499. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  500. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  501. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  502. procedure Start;
  503. procedure CancelAll;
  504. end;
  505. TScheduledTaskList = TList<IScheduledTask>;
  506. TScheduler = class(TThread)
  507. private
  508. fListLock : TCriticalSection;
  509. fCondVar : TSimpleEvent;
  510. fTaskList : TScheduledTaskList;
  511. fRemoveTaskAfterExpiration : Boolean;
  512. public
  513. constructor Create(aTaskList : TScheduledTaskList);
  514. destructor Destroy; override;
  515. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  516. procedure Execute; override;
  517. function Add(aTask : TScheduledTask) : Integer;
  518. function Get(aIdTask : Int64) : IScheduledTask; overload;
  519. function Get(const aTaskName : string) : IScheduledTask; overload;
  520. end;
  521. TScheduledTasks = class
  522. private
  523. fTaskList : TScheduledTaskList;
  524. fScheduler : TScheduler;
  525. fNumPushedTasks : Int64;
  526. fRemoveTaskAfterExpiration : Boolean;
  527. fIsStarted : Boolean;
  528. fFaultPolicy : TFaultPolicy;
  529. public
  530. constructor Create;
  531. destructor Destroy; override;
  532. property NumPushedTasks : Int64 read fNumPushedTasks;
  533. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  534. property IsStarted : Boolean read fIsStarted;
  535. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  536. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  537. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  538. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  539. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  540. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  541. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  542. procedure Start;
  543. procedure Stop;
  544. end;
  545. implementation
  546. { TThreadedQueueCS<T> }
  547. procedure TThreadedQueueCS<T>.Clear;
  548. var
  549. obj : T;
  550. begin
  551. FQueueLock.Enter;
  552. try
  553. for obj in FQueue do
  554. begin
  555. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  556. end;
  557. SetLength(FQueue,0);
  558. finally
  559. FQueueLock.Leave;
  560. end;
  561. {$IFDEF FPC}
  562. FQueueCondVar.SetEvent;
  563. {$ELSE}
  564. FQueueCondVar.ReleaseAll;
  565. {$ENDIF}
  566. end;
  567. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  568. begin
  569. inherited Create;
  570. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  571. SetLength(FQueue, AQueueDepth);
  572. FQueueLock := TCriticalSection.Create;
  573. {$IFDEF FPC}
  574. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  575. {$ELSE}
  576. FQueueCondVar := TConditionVariableCS.Create;
  577. {$ENDIF}
  578. FPushTimeout := PushTimeout;
  579. FPopTimeout := PopTimeout;
  580. end;
  581. destructor TThreadedQueueCS<T>.Destroy;
  582. begin
  583. DoShutDown;
  584. FQueueLock.Free;
  585. FQueueCondVar.Free;
  586. inherited;
  587. end;
  588. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  589. begin
  590. FQueueLock.Enter;
  591. try
  592. SetLength(FQueue, Length(FQueue) + ADelta);
  593. finally
  594. FQueueLock.Leave;
  595. end;
  596. {$IFDEF FPC}
  597. FQueueCondVar.SetEvent;
  598. {$ELSE}
  599. FQueueCondVar.ReleaseAll;
  600. {$ENDIF}
  601. end;
  602. function TThreadedQueueCS<T>.PopItem: T;
  603. var
  604. LQueueSize: Integer;
  605. begin
  606. PopItem(LQueueSize, Result);
  607. end;
  608. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  609. begin
  610. AItem := Default(T);
  611. FQueueLock.Enter;
  612. try
  613. Result := wrSignaled;
  614. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  615. begin
  616. {$IFDEF FPC}
  617. Result := FQueueCondVar.WaitFor(FPopTimeout);
  618. {$ELSE}
  619. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  620. {$ENDIF}
  621. end;
  622. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  623. AItem := FQueue[FQueueOffset];
  624. FQueue[FQueueOffset] := Default(T);
  625. if FQueueSize = Length(FQueue) then
  626. begin
  627. {$IFDEF FPC}
  628. FQueueCondVar.SetEvent;
  629. {$ELSE}
  630. FQueueCondVar.ReleaseAll;
  631. {$ENDIF}
  632. end;
  633. Dec(FQueueSize);
  634. Inc(FQueueOffset);
  635. Inc(FTotalItemsPopped);
  636. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  637. finally
  638. AQueueSize := FQueueSize;
  639. FQueueLock.Leave;
  640. end;
  641. end;
  642. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  643. var
  644. LQueueSize: Integer;
  645. begin
  646. Result := PopItem(LQueueSize, AItem);
  647. end;
  648. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  649. begin
  650. PopItem(AQueueSize, Result);
  651. end;
  652. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  653. var
  654. LQueueSize: Integer;
  655. begin
  656. Result := PushItem(AItem, LQueueSize);
  657. end;
  658. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  659. begin
  660. FQueueLock.Enter;
  661. try
  662. if FQueueSize >= High(FQueue) then
  663. begin
  664. if FQueueSize < 1024 then Grow(FQueueSize)
  665. else Grow(FQueueSize Div 2);
  666. end;
  667. Result := wrSignaled;
  668. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  669. begin
  670. {$IFDEF FPC}
  671. Result := FQueueCondVar.WaitFor(FPushTimeout);
  672. {$ELSE}
  673. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  674. {$ENDIF}
  675. end;
  676. if FShutDown or (Result <> wrSignaled) then Exit;
  677. if FQueueSize = 0 then
  678. begin
  679. {$IFDEF FPC}
  680. FQueueCondVar.SetEvent;
  681. {$ELSE}
  682. FQueueCondVar.ReleaseAll;
  683. {$ENDIF}
  684. end;
  685. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  686. Inc(FQueueSize);
  687. Inc(FTotalItemsPushed);
  688. finally
  689. AQueueSize := FQueueSize;
  690. FQueueLock.Leave;
  691. end;
  692. end;
  693. procedure TThreadedQueueCS<T>.DoShutDown;
  694. begin
  695. FShutDown := True;
  696. {$IFDEF FPC}
  697. FQueueCondVar.SetEvent;
  698. {$ELSE}
  699. FQueueCondVar.ReleaseAll;
  700. {$ENDIF}
  701. end;
  702. { TThreadedQueueList<T> }
  703. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  704. begin
  705. inherited Create;
  706. fQueue := TQueue<T>.Create;
  707. fQueue.Capacity := AQueueDepth;
  708. fQueueSize := 0;
  709. fQueueLock := TCriticalSection.Create;
  710. {$IFDEF FPC}
  711. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  712. {$ELSE}
  713. fQueueCondVar := TConditionVariableCS.Create;
  714. {$ENDIF}
  715. fPushTimeout := PushTimeout;
  716. fPopTimeout := PopTimeout;
  717. end;
  718. destructor TThreadedQueueList<T>.Destroy;
  719. begin
  720. DoShutDown;
  721. fQueueLock.Free;
  722. fQueueCondVar.Free;
  723. fQueue.Free;
  724. inherited;
  725. end;
  726. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  727. begin
  728. fQueueLock.Enter;
  729. try
  730. fQueue.Capacity := fQueue.Capacity + ADelta;
  731. finally
  732. fQueueLock.Leave;
  733. end;
  734. {$IFDEF FPC}
  735. FQueueCondVar.SetEvent;
  736. {$ELSE}
  737. FQueueCondVar.ReleaseAll;
  738. {$ENDIF}
  739. end;
  740. function TThreadedQueueList<T>.PopItem: T;
  741. var
  742. LQueueSize: Integer;
  743. begin
  744. PopItem(LQueueSize, Result);
  745. end;
  746. {$IFDEF FPC}
  747. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  748. //var
  749. //crono : TChronometer;
  750. begin
  751. AItem := Default(T);
  752. //crono := TChronometer.Create(False);
  753. try
  754. Result := wrSignaled;
  755. //writeln('popitem');
  756. //crono.Start;
  757. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  758. begin
  759. //crono.Start;
  760. Result := FQueueCondVar.WaitFor(FPopTimeout);
  761. //crono.Stop;
  762. //writeln('in: ' + crono.ElapsedTime);
  763. //if result = twaitresult.wrError then result := twaitresult.wrError;
  764. end;
  765. //crono.Stop;
  766. //writeln('out: ' + crono.ElapsedTime);
  767. fQueueLock.Enter;
  768. try
  769. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  770. AItem := fQueue.Extract;
  771. Dec(FQueueSize);
  772. Inc(fTotalItemsPopped);
  773. finally
  774. fQueueLock.Leave;
  775. end;
  776. finally
  777. AQueueSize := fQueueSize;
  778. end;
  779. end;
  780. {$ELSE}
  781. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  782. begin
  783. AItem := Default(T);
  784. fQueueLock.Enter;
  785. try
  786. Result := wrSignaled;
  787. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  788. begin
  789. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  790. end;
  791. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  792. AItem := fQueue.Extract;
  793. if fQueueSize = fQueue.Count then
  794. begin
  795. FQueueCondVar.ReleaseAll;
  796. end;
  797. Dec(FQueueSize);
  798. Inc(fTotalItemsPopped);
  799. finally
  800. AQueueSize := fQueueSize;
  801. fQueueLock.Leave;
  802. end;
  803. end;
  804. {$ENDIF}
  805. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  806. var
  807. LQueueSize: Integer;
  808. begin
  809. Result := PopItem(LQueueSize, AItem);
  810. end;
  811. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  812. begin
  813. PopItem(AQueueSize, Result);
  814. end;
  815. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  816. var
  817. LQueueSize: Integer;
  818. begin
  819. Result := PushItem(AItem, LQueueSize);
  820. end;
  821. {$IFDEF FPC}
  822. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  823. begin
  824. FQueueLock.Enter;
  825. try
  826. if FQueueSize >= fQueue.Count then Grow(10);
  827. Result := wrSignaled;
  828. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  829. //begin
  830. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  831. //end;
  832. if fShutDown or (Result <> wrSignaled) then Exit;
  833. //if fQueueSize = 0 then
  834. //begin
  835. // FQueueCondVar.SetEvent;
  836. //end;
  837. fQueue.Enqueue(AItem);
  838. Inc(FQueueSize);
  839. Inc(fTotalItemsPushed);
  840. finally
  841. AQueueSize := fQueueSize;
  842. FQueueLock.Leave;
  843. //FQueueCondVar.SetEvent;
  844. end;
  845. end;
  846. {$ELSE}
  847. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  848. begin
  849. FQueueLock.Enter;
  850. try
  851. Result := wrSignaled;
  852. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  853. //begin
  854. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  855. //end;
  856. if fShutDown or (Result <> wrSignaled) then Exit;
  857. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  858. fQueue.Enqueue(AItem);
  859. Inc(FQueueSize);
  860. Inc(fTotalItemsPushed);
  861. finally
  862. AQueueSize := fQueueSize;
  863. FQueueLock.Leave;
  864. end;
  865. end;
  866. {$ENDIF}
  867. procedure TThreadedQueueList<T>.DoShutDown;
  868. begin
  869. fShutDown := True;
  870. {$IFDEF FPC}
  871. FQueueCondVar.SetEvent;
  872. {$ELSE}
  873. FQueueCondVar.ReleaseAll;
  874. {$ENDIF}
  875. end;
  876. {$IFNDEF FPC}
  877. { TThreadObjectList<T> }
  878. procedure TThreadObjectList<T>.Add(const Item: T);
  879. begin
  880. LockList;
  881. try
  882. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  883. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  884. finally
  885. UnlockList;
  886. end;
  887. end;
  888. procedure TThreadObjectList<T>.Clear;
  889. begin
  890. LockList;
  891. try
  892. fList.Clear;
  893. finally
  894. UnlockList;
  895. end;
  896. end;
  897. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  898. begin
  899. inherited Create;
  900. fLock := TObject.Create;
  901. fList := TObjectList<T>.Create;
  902. fDuplicates := dupIgnore;
  903. end;
  904. destructor TThreadObjectList<T>.Destroy;
  905. begin
  906. LockList;
  907. try
  908. fList.Free;
  909. inherited Destroy;
  910. finally
  911. UnlockList;
  912. fLock.Free;
  913. end;
  914. end;
  915. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  916. begin
  917. LockList;
  918. try
  919. Result := fList[aIndex];
  920. finally
  921. UnlockList;
  922. end;
  923. end;
  924. function TThreadObjectList<T>.LockList: TObjectList<T>;
  925. begin
  926. System.TMonitor.Enter(fLock);
  927. Result := fList;
  928. end;
  929. procedure TThreadObjectList<T>.Remove(const Item: T);
  930. begin
  931. RemoveItem(Item, TDirection.FromBeginning);
  932. end;
  933. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  934. begin
  935. LockList;
  936. try
  937. fList.RemoveItem(Item, Direction);
  938. finally
  939. UnlockList;
  940. end;
  941. end;
  942. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  943. begin
  944. LockList;
  945. try
  946. fList[aIndex] := aValue;
  947. finally
  948. UnlockList;
  949. end;
  950. end;
  951. procedure TThreadObjectList<T>.UnlockList;
  952. begin
  953. System.TMonitor.Exit(fLock);
  954. end;
  955. {$ENDIF}
  956. { TAnonymousThread }
  957. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  958. begin
  959. fThread := TAdvThread.Create(aProc,aSynchronize);
  960. end;
  961. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  962. begin
  963. Result := TAnonymousThread.Create(aProc,False);
  964. end;
  965. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  966. begin
  967. Result := TAnonymousThread.Create(aProc,True);
  968. end;
  969. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  970. begin
  971. Result := Self;
  972. fThread.OnException(aProc);
  973. end;
  974. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  975. begin
  976. Result := Self;
  977. fThread.OnTerminate(aProc,False);
  978. end;
  979. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  980. begin
  981. Result := Self;
  982. fThread.OnTerminate(aProc,True);
  983. end;
  984. procedure TAnonymousThread.Start;
  985. begin
  986. fThread.Start;
  987. end;
  988. { TTask }
  989. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  990. var
  991. i : Integer;
  992. begin
  993. fTaskStatus := TWorkTaskStatus.wtsPending;
  994. fCustomFaultPolicy := False;
  995. fNumRetries := 0;
  996. fExecuteWithSync := False;
  997. fTerminateWithSync := False;
  998. fExceptionWithSync := False;
  999. fFaultControl := TFaultControl.Create;
  1000. fFaultControl.OnRetry := DoRetry;
  1001. fOwnedParams := aOwnedParams;
  1002. fParamList := TParamList.Create(True);
  1003. for i := Low(aParamArray) to High(aParamArray) do
  1004. begin
  1005. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1006. {$IFDEF FPC}
  1007. fParamList[i].Value._AddRef;
  1008. {$ENDIF}
  1009. end;
  1010. fExecuteProc := aTaskProc;
  1011. fEnabled := False;
  1012. end;
  1013. destructor TTask.Destroy;
  1014. begin
  1015. fFaultControl.Free;
  1016. //free passed params
  1017. fParamList.Free;
  1018. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1019. inherited;
  1020. end;
  1021. procedure TTask.Disable;
  1022. begin
  1023. fEnabled := False;
  1024. end;
  1025. procedure TTask.DoException(aException : Exception);
  1026. begin
  1027. fTaskStatus := TWorkTaskStatus.wtsException;
  1028. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1029. else raise aException;
  1030. end;
  1031. procedure TTask.DoExecute;
  1032. begin
  1033. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1034. DoInitialize;
  1035. repeat
  1036. try
  1037. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1038. fTaskStatus := TWorkTaskStatus.wtsDone;
  1039. fFaultControl.SuccessExecution;
  1040. except
  1041. on E : Exception do
  1042. begin
  1043. fTaskStatus := TWorkTaskStatus.wtsException;
  1044. {$IFNDEF FPC}
  1045. {$IF DELPHIRX10_UP}
  1046. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1047. {$ELSE}
  1048. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1049. {$ENDIF}
  1050. {$ELSE}
  1051. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1052. {$ENDIF}
  1053. end;
  1054. end;
  1055. until not fFaultControl.NeedToRetry;
  1056. end;
  1057. procedure TTask.DoInitialize;
  1058. begin
  1059. try
  1060. fFaultControl.Reset;
  1061. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1062. except
  1063. on E : Exception do
  1064. begin
  1065. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1066. end;
  1067. end;
  1068. end;
  1069. function TTask.Done: Boolean;
  1070. begin
  1071. Result := not fFaultControl.TaskFailed;
  1072. end;
  1073. function TTask.Failed: Boolean;
  1074. begin
  1075. Result := fFaultControl.TaskFailed;
  1076. end;
  1077. function TTask.CircuitBreaked: Boolean;
  1078. begin
  1079. Result := fFaultControl.CircuitBreaked;
  1080. end;
  1081. function TTask.LastException: Exception;
  1082. begin
  1083. Result := fFaultControl.LastException;
  1084. end;
  1085. function TTask.MaxRetries: Integer;
  1086. begin
  1087. Result := fFaultControl.MaxRetries;
  1088. end;
  1089. function TTask.NumRetries: Integer;
  1090. begin
  1091. Result := fFaultControl.NumRetries;
  1092. end;
  1093. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1094. begin
  1095. vStopRetries := False;
  1096. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1097. end;
  1098. procedure TTask.DoTerminate;
  1099. begin
  1100. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1101. end;
  1102. procedure TTask.Enable;
  1103. begin
  1104. fEnabled := True;
  1105. end;
  1106. function TTask.GetIdTask: Int64;
  1107. begin
  1108. Result := fIdTask;
  1109. end;
  1110. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1111. begin
  1112. {$IFDEF FPC}
  1113. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1114. {$ENDIF}
  1115. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1116. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1117. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1118. end;
  1119. procedure TTask.SetIdTask(Value : Int64);
  1120. begin
  1121. fIdTask := Value;
  1122. end;
  1123. function TTask.GetNumWorker: Integer;
  1124. begin
  1125. Result := fNumWorker;
  1126. end;
  1127. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1128. begin
  1129. Result := fParamList[aIndex].Value;
  1130. end;
  1131. function TTask.GetParam(const aName: string): TFlexValue;
  1132. var
  1133. paramvalue : TParamValue;
  1134. begin
  1135. for paramvalue in fParamList do
  1136. begin
  1137. if CompareText(paramvalue.Name,aName) = 0 then
  1138. begin
  1139. Exit(paramvalue.Value)
  1140. end;
  1141. end;
  1142. //if not exists
  1143. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1144. end;
  1145. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1146. begin
  1147. Result := @fParamList[aIndex].Value;
  1148. end;
  1149. function TTask.GetResult: TFlexValue;
  1150. begin
  1151. Result := fResult;
  1152. end;
  1153. function TTask.IsEnabled: Boolean;
  1154. begin
  1155. Result := fEnabled;
  1156. end;
  1157. procedure TTask.SetNumWorker(Value: Integer);
  1158. begin
  1159. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1160. fNumWorker := Value;
  1161. end;
  1162. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1163. begin
  1164. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1165. fParamList[aIndex].Value := Value;
  1166. end;
  1167. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1168. var
  1169. paramvalue : TParamValue;
  1170. begin
  1171. //check if already exists parameter
  1172. for paramvalue in fParamList do
  1173. begin
  1174. if CompareText(paramvalue.Name,aName) = 0 then
  1175. begin
  1176. paramvalue.Value := Value;
  1177. Exit;
  1178. end;
  1179. end;
  1180. //if not exists, create one
  1181. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1182. end;
  1183. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1184. begin
  1185. SetParam(aName,Value,False);
  1186. end;
  1187. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1188. begin
  1189. fFaultControl.MaxRetries := aMaxRetries;
  1190. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1191. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1192. fCustomFaultPolicy := True;
  1193. end;
  1194. procedure TTask.SetResult(aValue: TFlexValue);
  1195. begin
  1196. fResult := aValue;
  1197. end;
  1198. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1199. begin
  1200. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1201. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1202. fFaultControl.WaitTimeMultiplierFactor := 1;
  1203. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1204. fCustomFaultPolicy := True;
  1205. end;
  1206. function TTask.TaskStatus: TWorkTaskStatus;
  1207. begin
  1208. Result := fTaskStatus;
  1209. end;
  1210. { TWorkTask }
  1211. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1212. begin
  1213. fExceptProc := aTaskProc;
  1214. Result := Self;
  1215. end;
  1216. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1217. begin
  1218. fExceptionWithSync := True;
  1219. Result := OnException(aTaskProc);
  1220. end;
  1221. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1222. begin
  1223. fInitializeProc := aTaskProc;
  1224. Result := Self;
  1225. end;
  1226. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1227. begin
  1228. fRetryProc := aTaskProc;
  1229. Result := Self;
  1230. end;
  1231. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1232. begin
  1233. fTerminateProc := aTaskProc;
  1234. Result := Self;
  1235. end;
  1236. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1237. begin
  1238. fTerminateWithSync := True;
  1239. Result := OnTerminated(aTaskProc);
  1240. end;
  1241. procedure TWorkTask.Run;
  1242. begin
  1243. fEnabled := True;
  1244. end;
  1245. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1246. begin
  1247. Result := Self;
  1248. SetParam(aName,aValue);
  1249. end;
  1250. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1251. begin
  1252. Result := Self;
  1253. SetParam(aName,aValue,aOwned);
  1254. end;
  1255. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1256. begin
  1257. Result := Self;
  1258. SetRetryPolicy(aMaxRetries,0,1);
  1259. end;
  1260. function TWorkTask.RetryForever: IWorkTask;
  1261. begin
  1262. Result := Self;
  1263. SetRetryPolicy(-1,0,1);
  1264. end;
  1265. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1266. begin
  1267. Result := Self;
  1268. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1269. end;
  1270. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1271. begin
  1272. Result := Self;
  1273. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1274. end;
  1275. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1276. begin
  1277. Result := Self;
  1278. SetRetryPolicy(aWaitTimeArray);
  1279. end;
  1280. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1281. begin
  1282. Result := Self;
  1283. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1284. end;
  1285. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1286. begin
  1287. Result := Self;
  1288. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1289. end;
  1290. { TBackgroundTasks }
  1291. procedure TBackgroundTasks.CancelAll;
  1292. begin
  1293. fTaskQueue.Clear;
  1294. end;
  1295. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1296. begin
  1297. fMaxQueue := 0;
  1298. fConcurrentWorkers := aConcurrentWorkers;
  1299. fInsertTimeout := INFINITE;
  1300. fExtractTimeout := 5000;
  1301. fNumPushedTasks := 0;
  1302. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1303. end;
  1304. destructor TBackgroundTasks.Destroy;
  1305. begin
  1306. CancelAll;
  1307. fTaskQueue.DoShutDown;
  1308. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1309. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1310. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1311. inherited;
  1312. end;
  1313. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1314. begin
  1315. Result := fTaskQueue.QueueSize;
  1316. end;
  1317. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1318. begin
  1319. Result := AddTask([],False,aTaskProc);
  1320. end;
  1321. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1322. var
  1323. worktask : IWorkTask;
  1324. begin
  1325. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1326. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1327. Inc(fNumPushedTasks);
  1328. worktask.SetIdTask(fNumPushedTasks);
  1329. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1330. begin
  1331. Result := worktask;
  1332. end;
  1333. end;
  1334. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1335. begin
  1336. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1337. TTask(Result).ExecuteWithSync := True;
  1338. end;
  1339. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1340. begin
  1341. Result := AddTask_Sync([],False,aTaskProc);
  1342. end;
  1343. procedure TBackgroundTasks.Start;
  1344. var
  1345. i : Integer;
  1346. worker : TWorker;
  1347. begin
  1348. //create workers
  1349. if fWorkerPool <> nil then fWorkerPool.Free;
  1350. fWorkerPool := TWorkerPool.Create(True);
  1351. for i := 1 to fConcurrentWorkers do
  1352. begin
  1353. worker := TQueueWorker.Create(i,fTaskQueue);
  1354. fWorkerPool.Add(worker);
  1355. worker.Start;
  1356. end;
  1357. end;
  1358. { TWorker }
  1359. constructor TWorker.Create;
  1360. begin
  1361. inherited Create(True);
  1362. fDefaultFaultPolicy := TFaultPolicy.Create;
  1363. fStatus := TWorkerStatus.wsSuspended;
  1364. FreeOnTerminate := False;
  1365. end;
  1366. destructor TWorker.Destroy;
  1367. begin
  1368. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1369. inherited;
  1370. end;
  1371. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1372. begin
  1373. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1374. end;
  1375. procedure TWorker.Execute;
  1376. begin
  1377. end;
  1378. procedure TWorker.ExecuteTask;
  1379. begin
  1380. fCurrentTask.DoExecute;
  1381. end;
  1382. procedure TWorker.TerminateTask;
  1383. begin
  1384. fCurrentTask.DoTerminate;
  1385. end;
  1386. { TSimpleWorker }
  1387. constructor TSimpleWorker.Create(aTask : ITask);
  1388. begin
  1389. inherited Create;
  1390. fCurrentTask := aTask;
  1391. FreeOnTerminate := True;
  1392. end;
  1393. procedure TSimpleWorker.Execute;
  1394. begin
  1395. fStatus := TWorkerStatus.wsIdle;
  1396. while not Terminated do
  1397. begin
  1398. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1399. try
  1400. fStatus := TWorkerStatus.wsWorking;
  1401. try
  1402. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1403. else fCurrentTask.DoExecute;
  1404. except
  1405. on E : Exception do
  1406. begin
  1407. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1408. else raise ETaskExecutionError.Create(e.Message);
  1409. end;
  1410. end;
  1411. finally
  1412. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1413. else fCurrentTask.DoTerminate;
  1414. fStatus := TWorkerStatus.wsIdle;
  1415. Terminate;
  1416. end;
  1417. end;
  1418. fStatus := TWorkerStatus.wsSuspended
  1419. end;
  1420. { TQueueWorker }
  1421. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1422. begin
  1423. inherited Create;
  1424. fNumWorker := aNumWorker;
  1425. fTaskQueue := aTaskQueue;
  1426. end;
  1427. procedure TQueueWorker.Execute;
  1428. begin
  1429. fStatus := TWorkerStatus.wsIdle;
  1430. while not Terminated do
  1431. begin
  1432. fCurrentTask := fTaskQueue.PopItem;
  1433. if fCurrentTask <> nil then
  1434. try
  1435. fStatus := TWorkerStatus.wsWorking;
  1436. try
  1437. fCurrentIdTask := fCurrentTask.GetIdTask;
  1438. SetFaultPolicy(TTask(fCurrentTask));
  1439. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1440. else fCurrentTask.DoExecute;
  1441. except
  1442. on E : Exception do
  1443. begin
  1444. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1445. else raise ETaskExecutionError.Create(e.Message);
  1446. end;
  1447. end;
  1448. finally
  1449. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1450. else fCurrentTask.DoTerminate;
  1451. fStatus := TWorkerStatus.wsIdle;
  1452. end;
  1453. end;
  1454. fStatus := TWorkerStatus.wsSuspended
  1455. end;
  1456. { TScheduledWorker }
  1457. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1458. begin
  1459. inherited Create;
  1460. {$IFNDEF DELPHILINUX}
  1461. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1462. {$ENDIF}
  1463. FreeOnTerminate := True;
  1464. fCurrentTask := aScheduledTask;
  1465. end;
  1466. procedure TScheduledWorker.Execute;
  1467. begin
  1468. fStatus := TWorkerStatus.wsIdle;
  1469. if Assigned(fCurrentTask) then
  1470. begin
  1471. try
  1472. fStatus := TWorkerStatus.wsWorking;
  1473. try
  1474. SetFaultPolicy(TTask(fCurrentTask));
  1475. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1476. else fCurrentTask.DoExecute;
  1477. fStatus := TWorkerStatus.wsIdle;
  1478. except
  1479. on E : Exception do
  1480. begin
  1481. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1482. else raise ETaskExecutionError.Create(e.Message);
  1483. end;
  1484. end;
  1485. finally
  1486. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1487. else fCurrentTask.DoTerminate;
  1488. //check if expired
  1489. if (fCurrentTask as IScheduledTask).IsFinished then
  1490. begin
  1491. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1492. else (fCurrentTask as IScheduledTask).DoExpire;
  1493. end;
  1494. end;
  1495. end;
  1496. fCurrentTask := nil;
  1497. fStatus := TWorkerStatus.wsSuspended;
  1498. end;
  1499. procedure TScheduledWorker.ExpireTask;
  1500. begin
  1501. (fCurrentTask as IScheduledTask).DoExpire;
  1502. end;
  1503. { TScheduledTasks }
  1504. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1505. begin
  1506. Result := AddTask(aTaskName,[],False,aTaskProc);
  1507. end;
  1508. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1509. var
  1510. scheduletask : TScheduledTask;
  1511. begin
  1512. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1513. scheduletask.Name := aTaskName;
  1514. Inc(fNumPushedTasks);
  1515. scheduletask.SetIdTask(fNumPushedTasks);
  1516. fTaskList.Add(scheduletask);
  1517. Result := scheduletask;
  1518. end;
  1519. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1520. begin
  1521. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1522. TTask(Result).ExecuteWithSync := True;
  1523. end;
  1524. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1525. begin
  1526. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1527. end;
  1528. constructor TScheduledTasks.Create;
  1529. begin
  1530. fNumPushedTasks := 0;
  1531. fIsStarted := False;
  1532. fFaultPolicy := TFaultPolicy.Create;
  1533. fTaskList := TScheduledTaskList.Create;
  1534. end;
  1535. destructor TScheduledTasks.Destroy;
  1536. begin
  1537. if Assigned(fScheduler) then
  1538. begin
  1539. fScheduler.Terminate;
  1540. fScheduler.WaitFor;
  1541. fScheduler.Free;
  1542. end;
  1543. if Assigned(fTaskList) then fTaskList.Free;
  1544. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1545. inherited;
  1546. end;
  1547. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1548. begin
  1549. Result := fScheduler.Get(aIdTask);
  1550. end;
  1551. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1552. begin
  1553. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1554. Result := fScheduler.Get(aTaskName);
  1555. end;
  1556. procedure TScheduledTasks.Start;
  1557. begin
  1558. if fIsStarted then Exit;
  1559. if not Assigned(fScheduler) then
  1560. begin
  1561. fScheduler := TScheduler.Create(fTaskList);
  1562. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1563. end;
  1564. fScheduler.Start;
  1565. fIsStarted := True;
  1566. end;
  1567. procedure TScheduledTasks.Stop;
  1568. begin
  1569. if Assigned(fScheduler) then fScheduler.Terminate;
  1570. fIsStarted := False;
  1571. end;
  1572. { TScheduledTask }
  1573. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1574. begin
  1575. Result := Self;
  1576. SetParam(aName,aValue);
  1577. end;
  1578. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1579. begin
  1580. Result := Self;
  1581. SetParam(aName,aValue);
  1582. end;
  1583. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1584. begin
  1585. Result := Self;
  1586. ClearSchedule;
  1587. fScheduleMode := TScheduleMode.smRunOnce;
  1588. fStartDate := aStartDate;
  1589. fNextExecution := aStartDate;
  1590. end;
  1591. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1592. begin
  1593. Result := Self;
  1594. ClearSchedule;
  1595. fScheduleMode := TScheduleMode.smRunOnce;
  1596. fStartDate := IncMinute(Now(),aMinutes);
  1597. fNextExecution := fStartDate;
  1598. end;
  1599. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1600. begin
  1601. Result := Self;
  1602. ClearSchedule;
  1603. fScheduleMode := TScheduleMode.smRunOnce;
  1604. fStartDate := IncSecond(Now(),aSeconds);
  1605. fNextExecution := fStartDate;
  1606. end;
  1607. function TScheduledTask.StartNow: IScheduledTask;
  1608. begin
  1609. Result := Self;
  1610. ClearSchedule;
  1611. fScheduleMode := TScheduleMode.smRunOnce;
  1612. fStartDate := Now();
  1613. fNextExecution := fStartDate;
  1614. end;
  1615. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1616. begin
  1617. Result := Self;
  1618. ClearSchedule;
  1619. fScheduleMode := TScheduleMode.smRunOnce;
  1620. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1621. fNextExecution := fStartDate;
  1622. end;
  1623. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1624. begin
  1625. Result := Self;
  1626. ClearSchedule;
  1627. fScheduleMode := TScheduleMode.smRunOnce;
  1628. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1629. fNextExecution := fStartDate;
  1630. end;
  1631. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1632. begin
  1633. Result := Self;
  1634. ClearSchedule;
  1635. fScheduleMode := TScheduleMode.smRunOnce;
  1636. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1637. fNextExecution := fStartDate;
  1638. end;
  1639. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1640. begin
  1641. Result := Self;
  1642. SetRetryPolicy(aMaxRetries,0,1);
  1643. end;
  1644. function TScheduledTask.RetryForever: IScheduledTask;
  1645. begin
  1646. Result := Self;
  1647. SetRetryPolicy(-1,0,1);
  1648. end;
  1649. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1650. begin
  1651. Result := Self;
  1652. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1653. end;
  1654. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1655. begin
  1656. Result := Self;
  1657. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1658. end;
  1659. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1660. begin
  1661. Result := Self;
  1662. SetRetryPolicy(aWaitTimeArray);
  1663. end;
  1664. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1665. begin
  1666. Result := Self;
  1667. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1668. end;
  1669. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1670. begin
  1671. Result := Self;
  1672. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1673. end;
  1674. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1675. begin
  1676. if fStartDate = 0.0 then ClearSchedule;
  1677. fScheduleMode := TScheduleMode.smRepeatMode;
  1678. fTimeMeasure := aTimeMeasure;
  1679. fTimeInterval := aInterval;
  1680. if fStartDate < Now() then fStartDate := Now();
  1681. fNextExecution := fStartDate;
  1682. fEnabled := True;
  1683. end;
  1684. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1685. begin
  1686. if fStartDate = 0.0 then ClearSchedule;
  1687. fScheduleMode := TScheduleMode.smRepeatMode;
  1688. fTimeMeasure := aTimeMeasure;
  1689. fTimeInterval := aInterval;
  1690. if fStartDate < Now() then fStartDate := Now();
  1691. fExpirationDate := aEndTime;
  1692. fNextExecution := fStartDate;
  1693. fEnabled := True;
  1694. end;
  1695. procedure TScheduledTask.RepeatEveryDay;
  1696. begin
  1697. RepeatEvery(1,tmDays);
  1698. end;
  1699. procedure TScheduledTask.RepeatEveryWeek;
  1700. begin
  1701. RepeatEvery(7,tmDays);
  1702. end;
  1703. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1704. begin
  1705. if fStartDate = 0.0 then ClearSchedule;
  1706. fScheduleMode := TScheduleMode.smRepeatMode;
  1707. fTimeMeasure := aTimeMeasure;
  1708. fTimeInterval := aInterval;
  1709. if fStartDate < Now() then fStartDate := Now();
  1710. fExpirationTimes := aRepeatTimes;
  1711. fNextExecution := fStartDate;
  1712. fEnabled := True;
  1713. end;
  1714. procedure TScheduledTask.RunOnce;
  1715. begin
  1716. fScheduleMode := TScheduleMode.smRunOnce;
  1717. if fStartDate < Now() then fStartDate := Now();
  1718. fNextExecution := fStartDate;
  1719. fEnabled := True;
  1720. end;
  1721. procedure TScheduledTask.Cancel;
  1722. begin
  1723. fFinished := True;
  1724. end;
  1725. function TScheduledTask.CheckSchedule: Boolean;
  1726. begin
  1727. Result := False;
  1728. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1729. if fScheduleMode = TScheduleMode.smRunOnce then
  1730. begin
  1731. //if start date reached
  1732. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1733. begin
  1734. fLastExecution := Now();
  1735. Inc(fExecutionTimes);
  1736. fFinished := True;
  1737. Result := True;
  1738. end;
  1739. end
  1740. else
  1741. begin
  1742. //if next execution reached
  1743. if Now() >= fNextExecution then
  1744. begin
  1745. //check expiration limits
  1746. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1747. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1748. begin
  1749. fFinished := True;
  1750. Exit;
  1751. end;
  1752. //calculate next execution
  1753. case fTimeMeasure of
  1754. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1755. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1756. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1757. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1758. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1759. end;
  1760. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1761. else
  1762. begin
  1763. fLastExecution := Now();
  1764. Inc(fExecutionTimes);
  1765. Result := True;
  1766. end;
  1767. end;
  1768. end;
  1769. end;
  1770. procedure TScheduledTask.ClearSchedule;
  1771. begin
  1772. inherited;
  1773. fFinished := False;
  1774. fStartDate := 0.0;
  1775. fLastExecution := 0.0;
  1776. fNextExecution := 0.0;
  1777. fExpirationDate := 0.0;
  1778. fScheduleMode := TScheduleMode.smRunOnce;
  1779. fTimeMeasure := TTimeMeasure.tmSeconds;
  1780. fTimeInterval := 0;
  1781. end;
  1782. procedure TScheduledTask.DoExpire;
  1783. begin
  1784. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1785. fEnabled := False;
  1786. end;
  1787. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1788. begin
  1789. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1790. end;
  1791. function TScheduledTask.GetTaskName: string;
  1792. begin
  1793. Result := fName;
  1794. end;
  1795. function TScheduledTask.IsFinished: Boolean;
  1796. begin
  1797. Result := fFinished;
  1798. end;
  1799. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1800. begin
  1801. fExceptProc := aTaskProc;
  1802. Result := Self;
  1803. end;
  1804. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1805. begin
  1806. Result := OnException(aTaskProc);
  1807. TTask(Result).ExceptionWithSync := True;
  1808. end;
  1809. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1810. begin
  1811. fRetryProc := aTaskProc;
  1812. Result := Self;
  1813. end;
  1814. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1815. begin
  1816. fExpiredProc := aTaskProc;
  1817. Result := Self;
  1818. end;
  1819. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1820. begin
  1821. Result := OnExpired(aTaskProc);
  1822. TScheduledTask(Result).ExpireWithSync := True;
  1823. end;
  1824. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1825. begin
  1826. fInitializeProc := aTaskProc;
  1827. Result := Self;
  1828. end;
  1829. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1830. begin
  1831. fTerminateProc := aTaskProc;
  1832. Result := Self;
  1833. end;
  1834. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1835. begin
  1836. Result := OnTerminated(aTaskProc);
  1837. TTask(Result).TerminateWithSync := True;
  1838. end;
  1839. { TScheduler }
  1840. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1841. begin
  1842. inherited Create(True);
  1843. FreeOnTerminate := False;
  1844. fListLock := TCriticalSection.Create;
  1845. fRemoveTaskAfterExpiration := False;
  1846. fTaskList := aTaskList;
  1847. {$IFDEF FPC}
  1848. fCondVar := TSimpleEvent.Create;
  1849. {$ELSE}
  1850. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1851. {$ENDIF}
  1852. end;
  1853. destructor TScheduler.Destroy;
  1854. begin
  1855. fCondVar.SetEvent;
  1856. fCondVar.Free;
  1857. fTaskList := nil;
  1858. fListLock.Free;
  1859. inherited;
  1860. end;
  1861. procedure TScheduler.Execute;
  1862. var
  1863. task : IScheduledTask;
  1864. worker : TScheduledWorker;
  1865. numworker : Int64;
  1866. begin
  1867. numworker := 0;
  1868. while not Terminated do
  1869. begin
  1870. fListLock.Enter;
  1871. try
  1872. for task in fTaskList do
  1873. begin
  1874. if (task.IsEnabled) and (not task.IsFinished) then
  1875. begin
  1876. if task.CheckSchedule then
  1877. begin
  1878. Inc(numworker);
  1879. worker := TScheduledWorker.Create(numworker,task);
  1880. worker.Start;
  1881. end;
  1882. end
  1883. else
  1884. begin
  1885. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1886. end;
  1887. end;
  1888. task := nil;
  1889. finally
  1890. fListLock.Leave;
  1891. end;
  1892. fCondVar.WaitFor(250);
  1893. end;
  1894. end;
  1895. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1896. begin
  1897. Result := fTaskList.Add(aTask);
  1898. end;
  1899. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1900. var
  1901. task : IScheduledTask;
  1902. begin
  1903. fListLock.Enter;
  1904. try
  1905. for task in fTaskList do
  1906. begin
  1907. if task.IdTask = aIdTask then Exit(task);
  1908. end;
  1909. finally
  1910. fListLock.Leave;
  1911. end;
  1912. end;
  1913. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1914. var
  1915. task : IScheduledTask;
  1916. begin
  1917. fListLock.Enter;
  1918. try
  1919. for task in fTaskList do
  1920. begin
  1921. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1922. end;
  1923. finally
  1924. fListLock.Leave;
  1925. end;
  1926. end;
  1927. { TAdvThread }
  1928. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1929. begin
  1930. inherited Create(True);
  1931. FreeOnTerminate := True;
  1932. fExecuteWithSync := aSynchronize;
  1933. fExecuteProc := aProc;
  1934. end;
  1935. procedure TAdvThread.DoExecute;
  1936. begin
  1937. try
  1938. if Assigned(fExecuteProc) then fExecuteProc;
  1939. except
  1940. on E : Exception do
  1941. begin
  1942. {$IFNDEF FPC}
  1943. {$IF DELPHIRX10_UP}
  1944. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1945. {$ELSE}
  1946. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1947. {$ENDIF}
  1948. {$ELSE}
  1949. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1950. {$ENDIF}
  1951. else raise e;
  1952. end;
  1953. end;
  1954. end;
  1955. procedure TAdvThread.CallToTerminate;
  1956. begin
  1957. if Assigned(fTerminateProc) then fTerminateProc;
  1958. end;
  1959. procedure TAdvThread.DoTerminate;
  1960. begin
  1961. if fTerminateWithSync then Synchronize(CallToTerminate)
  1962. else CallToTerminate;
  1963. end;
  1964. procedure TAdvThread.Execute;
  1965. begin
  1966. if fExecuteWithSync then Synchronize(DoExecute)
  1967. else DoExecute;
  1968. end;
  1969. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1970. begin
  1971. fExceptionProc := aProc;
  1972. end;
  1973. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1974. begin
  1975. fTerminateWithSync := aSynchronize;
  1976. fTerminateProc := aProc;
  1977. end;
  1978. { TRunTask }
  1979. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  1980. begin
  1981. Result := Execute([],False,aTaskProc);
  1982. end;
  1983. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  1984. begin
  1985. Result := Execute_Sync([],False,aTaskProc);
  1986. end;
  1987. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1988. var
  1989. task : TWorkTask;
  1990. worker : TSimpleWorker;
  1991. begin
  1992. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1993. task.ExecuteWithSync := False;
  1994. Result := task;
  1995. worker := TSimpleWorker.Create(task);
  1996. worker.Start;
  1997. end;
  1998. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1999. var
  2000. task : TWorkTask;
  2001. worker : TSimpleWorker;
  2002. begin
  2003. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2004. task.ExecuteWithSync := True;
  2005. Result := task;
  2006. worker := TSimpleWorker.Create(task);
  2007. worker.Start;
  2008. end;
  2009. { TParamValue }
  2010. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2011. begin
  2012. inherited Create;
  2013. fName := aName;
  2014. fValue := aValue;
  2015. fOwned := aOwnedValue;
  2016. end;
  2017. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2018. begin
  2019. inherited Create;
  2020. fName := aName;
  2021. fValue := aValue;
  2022. fOwned := aOwnedValue;
  2023. end;
  2024. constructor TParamValue.Create;
  2025. begin
  2026. fName := '';
  2027. fOwned := False;
  2028. end;
  2029. destructor TParamValue.Destroy;
  2030. begin
  2031. {$IFDEF FPC}
  2032. fValue._Release;
  2033. {$ENDIF}
  2034. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2035. inherited;
  2036. end;
  2037. end.