Quick.Threads.pas 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286
  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 01/12/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. {$IFNDEF FPC}
  209. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  210. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  211. {$ELSE}
  212. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  213. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  214. {$ENDIF}
  215. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  216. property Result : TFlexValue read GetResult write SetResult;
  217. property IdTask : Int64 read GetIdTask;
  218. function Done : Boolean;
  219. function Failed : Boolean;
  220. function NumRetries : Integer;
  221. function MaxRetries : Integer;
  222. function LastException : Exception;
  223. function CircuitBreaked : Boolean;
  224. function IsEnabled : Boolean;
  225. end;
  226. {$IFNDEF FPC}
  227. TTaskProc = reference to procedure(task : ITask);
  228. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  229. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  230. {$ELSE}
  231. TTaskProc = procedure(task : ITask) of object;
  232. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  233. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  234. {$ENDIF}
  235. IWorkTask = interface(ITask)
  236. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  237. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  238. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  239. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  240. function Retry(aMaxRetries : Integer) : IWorkTask;
  241. function RetryForever : IWorkTask;
  242. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  243. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  244. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  245. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  246. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  247. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  248. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  249. procedure Run;
  250. end;
  251. IScheduledTask = interface(ITask)
  252. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  253. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  254. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  255. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  256. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  257. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  259. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  260. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  261. function Retry(aMaxRetries : Integer) : IScheduledTask;
  262. function RetryForever : IScheduledTask;
  263. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  264. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  265. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  266. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  267. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  268. function CheckSchedule : Boolean;
  269. procedure DoExpire;
  270. function GetTaskName : string;
  271. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  272. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  273. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  274. function StartOnDayChange : IScheduledTask;
  275. function StartNow : IScheduledTask;
  276. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  277. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  278. procedure RunOnce;
  279. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  280. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  281. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  282. procedure RepeatEveryDay;
  283. procedure RepeatEveryWeek;
  284. function IsFinished : Boolean;
  285. procedure Cancel;
  286. property Name : string read GetTaskName;
  287. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  288. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  289. end;
  290. TTask = class(TInterfacedObject,ITask)
  291. private
  292. fIdTask : Int64;
  293. fNumWorker : Integer;
  294. fNumRetries : Integer;
  295. fParamList : TParamList;
  296. fInitializeProc : TTaskProc;
  297. fExecuteProc : TTaskProc;
  298. fExceptProc : TTaskExceptionProc;
  299. fTerminateProc : TTaskProc;
  300. fExpiredProc : TTaskProc;
  301. fTaskStatus : TWorkTaskStatus;
  302. fOwnedParams : Boolean;
  303. fEnabled : Boolean;
  304. fExecuteWithSync : Boolean;
  305. fExceptionWithSync : Boolean;
  306. fRetryProc : TTaskRetryProc;
  307. fTerminateWithSync : Boolean;
  308. fFaultControl : TFaultControl;
  309. fCustomFaultPolicy : Boolean;
  310. fResult : TFlexValue;
  311. function GetParam(aIndex : Integer) : TFlexValue; overload;
  312. function GetParam(const aName : string) : TFlexValue; overload;
  313. function GetParam2(aIndex : Integer) : PFlexValue;
  314. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  315. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  316. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  317. procedure DoInitialize;
  318. procedure DoExecute;
  319. procedure DoException(aException : Exception);
  320. procedure DoTerminate;
  321. function GetNumWorker : Integer;
  322. procedure SetNumWorker(Value : Integer);
  323. function GetIdTask : Int64;
  324. procedure SetIdTask(Value : Int64);
  325. function GetResult : TFlexValue;
  326. procedure SetResult(aValue : TFlexValue);
  327. protected
  328. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  329. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  330. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  331. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  332. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  333. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  334. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  335. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  336. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  337. public
  338. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  339. destructor Destroy; override;
  340. property IdTask : Int64 read GetIdTask;
  341. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  342. function IsEnabled : Boolean;
  343. function TaskStatus : TWorkTaskStatus;
  344. function Done : Boolean;
  345. function Failed : Boolean;
  346. function NumRetries : Integer;
  347. function MaxRetries : Integer;
  348. function LastException : Exception;
  349. function CircuitBreaked : Boolean;
  350. end;
  351. TWorkTask = class(TTask,IWorkTask)
  352. public
  353. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  354. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  355. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  356. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  357. function Retry(aMaxRetries : Integer) : IWorkTask;
  358. function RetryForever : IWorkTask;
  359. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  360. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  361. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  362. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  363. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  364. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  365. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  366. procedure Run; virtual;
  367. end;
  368. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  369. TScheduledTask = class(TTask,IScheduledTask)
  370. private
  371. fName : string;
  372. fExecutionTimes : Integer;
  373. fScheduleMode : TScheduleMode;
  374. fTimeInterval : Integer;
  375. fTimeMeasure : TTimeMeasure;
  376. fStartDate : TDateTime;
  377. fLastExecution : TDateTime;
  378. fNextExecution : TDateTime;
  379. fExpirationDate : TDateTime;
  380. fExpirationTimes : Integer;
  381. fFinished : Boolean;
  382. fExpireWithSync: Boolean;
  383. procedure ClearSchedule;
  384. function CheckSchedule : Boolean;
  385. procedure DoExpire;
  386. function GetTaskName : string;
  387. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  388. protected
  389. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  390. public
  391. property Name : string read fName write fName;
  392. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  393. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  394. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  395. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  396. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  397. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  398. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  399. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  400. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  401. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  402. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  403. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  404. function StartOnDayChange : IScheduledTask;
  405. function StartNow : IScheduledTask;
  406. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  407. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  408. procedure RunOnce;
  409. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  410. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  411. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  412. procedure RepeatEveryDay;
  413. procedure RepeatEveryWeek;
  414. function Retry(aMaxRetries : Integer) : IScheduledTask;
  415. function RetryForever : IScheduledTask;
  416. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  417. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  418. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  419. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  420. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  421. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  422. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  423. function IsFinished : Boolean;
  424. procedure Cancel;
  425. end;
  426. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  427. TWorker = class(TThread)
  428. protected
  429. fStatus : TWorkerStatus;
  430. fCurrentTask : ITask;
  431. fDefaultFaultPolicy : TFaultPolicy;
  432. procedure ExecuteTask;
  433. procedure TerminateTask;
  434. public
  435. constructor Create;
  436. destructor Destroy; override;
  437. property Status : TWorkerStatus read fStatus;
  438. procedure SetFaultPolicy(aTask : TTask);
  439. procedure Execute; override;
  440. end;
  441. TSimpleWorker = class(TWorker)
  442. public
  443. constructor Create(aTask : ITask);
  444. procedure Execute; override;
  445. end;
  446. TQueueWorker = class(TWorker)
  447. private
  448. fCurrentIdTask : Integer;
  449. fNumWorker : Integer;
  450. fTaskQueue : TTaskQueue;
  451. public
  452. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  453. property NumWorker : Integer read fNumWorker;
  454. procedure Execute; override;
  455. end;
  456. TScheduledWorker = class(TWorker)
  457. private
  458. procedure ExpireTask;
  459. public
  460. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  461. procedure Execute; override;
  462. end;
  463. TWorkerPool = TObjectList<TWorker>;
  464. TRunTask = class
  465. public
  466. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  467. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  468. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  469. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  470. end;
  471. TBackgroundTasks = class
  472. private
  473. fMaxQueue : Integer;
  474. fWorkerPool : TWorkerPool;
  475. fConcurrentWorkers : Integer;
  476. fInsertTimeout : Cardinal;
  477. fExtractTimeout : Cardinal;
  478. fTaskQueue : TTaskQueue;
  479. fNumPushedTasks : Int64;
  480. function GetTaskQueue : Cardinal;
  481. public
  482. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  483. destructor Destroy; override;
  484. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  485. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  486. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  487. property TaskQueued : Cardinal read GetTaskQueue;
  488. property NumPushedTasks : Int64 read fNumPushedTasks;
  489. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  490. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  491. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  492. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  493. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  494. procedure Start;
  495. procedure CancelAll;
  496. end;
  497. TScheduledTaskList = TList<IScheduledTask>;
  498. TScheduler = class(TThread)
  499. private
  500. fListLock : TCriticalSection;
  501. fCondVar : TSimpleEvent;
  502. fTaskList : TScheduledTaskList;
  503. fRemoveTaskAfterExpiration : Boolean;
  504. public
  505. constructor Create(aTaskList : TScheduledTaskList);
  506. destructor Destroy; override;
  507. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  508. procedure Execute; override;
  509. function Add(aTask : TScheduledTask) : Integer;
  510. function Get(aIdTask : Int64) : IScheduledTask; overload;
  511. function Get(const aTaskName : string) : IScheduledTask; overload;
  512. end;
  513. TScheduledTasks = class
  514. private
  515. fTaskList : TScheduledTaskList;
  516. fScheduler : TScheduler;
  517. fNumPushedTasks : Int64;
  518. fRemoveTaskAfterExpiration : Boolean;
  519. fIsStarted : Boolean;
  520. fFaultPolicy : TFaultPolicy;
  521. public
  522. constructor Create;
  523. destructor Destroy; override;
  524. property NumPushedTasks : Int64 read fNumPushedTasks;
  525. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  526. property IsStarted : Boolean read fIsStarted;
  527. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  528. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  529. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  530. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  531. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  532. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  533. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  534. procedure Start;
  535. procedure Stop;
  536. end;
  537. implementation
  538. { TThreadedQueueCS<T> }
  539. procedure TThreadedQueueCS<T>.Clear;
  540. var
  541. obj : T;
  542. begin
  543. FQueueLock.Enter;
  544. try
  545. for obj in FQueue do
  546. begin
  547. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  548. end;
  549. SetLength(FQueue,0);
  550. finally
  551. FQueueLock.Leave;
  552. end;
  553. {$IFDEF FPC}
  554. FQueueCondVar.SetEvent;
  555. {$ELSE}
  556. FQueueCondVar.ReleaseAll;
  557. {$ENDIF}
  558. end;
  559. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  560. begin
  561. inherited Create;
  562. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  563. SetLength(FQueue, AQueueDepth);
  564. FQueueLock := TCriticalSection.Create;
  565. {$IFDEF FPC}
  566. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  567. {$ELSE}
  568. FQueueCondVar := TConditionVariableCS.Create;
  569. {$ENDIF}
  570. FPushTimeout := PushTimeout;
  571. FPopTimeout := PopTimeout;
  572. end;
  573. destructor TThreadedQueueCS<T>.Destroy;
  574. begin
  575. DoShutDown;
  576. FQueueLock.Free;
  577. FQueueCondVar.Free;
  578. inherited;
  579. end;
  580. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  581. begin
  582. FQueueLock.Enter;
  583. try
  584. SetLength(FQueue, Length(FQueue) + ADelta);
  585. finally
  586. FQueueLock.Leave;
  587. end;
  588. {$IFDEF FPC}
  589. FQueueCondVar.SetEvent;
  590. {$ELSE}
  591. FQueueCondVar.ReleaseAll;
  592. {$ENDIF}
  593. end;
  594. function TThreadedQueueCS<T>.PopItem: T;
  595. var
  596. LQueueSize: Integer;
  597. begin
  598. PopItem(LQueueSize, Result);
  599. end;
  600. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  601. begin
  602. AItem := Default(T);
  603. FQueueLock.Enter;
  604. try
  605. Result := wrSignaled;
  606. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  607. begin
  608. {$IFDEF FPC}
  609. Result := FQueueCondVar.WaitFor(FPopTimeout);
  610. {$ELSE}
  611. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  612. {$ENDIF}
  613. end;
  614. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  615. AItem := FQueue[FQueueOffset];
  616. FQueue[FQueueOffset] := Default(T);
  617. if FQueueSize = Length(FQueue) then
  618. begin
  619. {$IFDEF FPC}
  620. FQueueCondVar.SetEvent;
  621. {$ELSE}
  622. FQueueCondVar.ReleaseAll;
  623. {$ENDIF}
  624. end;
  625. Dec(FQueueSize);
  626. Inc(FQueueOffset);
  627. Inc(FTotalItemsPopped);
  628. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  629. finally
  630. AQueueSize := FQueueSize;
  631. FQueueLock.Leave;
  632. end;
  633. end;
  634. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  635. var
  636. LQueueSize: Integer;
  637. begin
  638. Result := PopItem(LQueueSize, AItem);
  639. end;
  640. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  641. begin
  642. PopItem(AQueueSize, Result);
  643. end;
  644. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  645. var
  646. LQueueSize: Integer;
  647. begin
  648. Result := PushItem(AItem, LQueueSize);
  649. end;
  650. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  651. begin
  652. FQueueLock.Enter;
  653. try
  654. if FQueueSize >= High(FQueue) then
  655. begin
  656. if FQueueSize < 1024 then Grow(FQueueSize)
  657. else Grow(FQueueSize Div 2);
  658. end;
  659. Result := wrSignaled;
  660. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  661. begin
  662. {$IFDEF FPC}
  663. Result := FQueueCondVar.WaitFor(FPushTimeout);
  664. {$ELSE}
  665. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  666. {$ENDIF}
  667. end;
  668. if FShutDown or (Result <> wrSignaled) then Exit;
  669. if FQueueSize = 0 then
  670. begin
  671. {$IFDEF FPC}
  672. FQueueCondVar.SetEvent;
  673. {$ELSE}
  674. FQueueCondVar.ReleaseAll;
  675. {$ENDIF}
  676. end;
  677. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  678. Inc(FQueueSize);
  679. Inc(FTotalItemsPushed);
  680. finally
  681. AQueueSize := FQueueSize;
  682. FQueueLock.Leave;
  683. end;
  684. end;
  685. procedure TThreadedQueueCS<T>.DoShutDown;
  686. begin
  687. FShutDown := True;
  688. {$IFDEF FPC}
  689. FQueueCondVar.SetEvent;
  690. {$ELSE}
  691. FQueueCondVar.ReleaseAll;
  692. {$ENDIF}
  693. end;
  694. { TThreadedQueueList<T> }
  695. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  696. begin
  697. inherited Create;
  698. fQueue := TQueue<T>.Create;
  699. fQueue.Capacity := AQueueDepth;
  700. fQueueSize := 0;
  701. fQueueLock := TCriticalSection.Create;
  702. {$IFDEF FPC}
  703. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  704. {$ELSE}
  705. fQueueCondVar := TConditionVariableCS.Create;
  706. {$ENDIF}
  707. fPushTimeout := PushTimeout;
  708. fPopTimeout := PopTimeout;
  709. end;
  710. destructor TThreadedQueueList<T>.Destroy;
  711. begin
  712. DoShutDown;
  713. fQueueLock.Free;
  714. fQueueCondVar.Free;
  715. fQueue.Free;
  716. inherited;
  717. end;
  718. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  719. begin
  720. fQueueLock.Enter;
  721. try
  722. fQueue.Capacity := fQueue.Capacity + ADelta;
  723. finally
  724. fQueueLock.Leave;
  725. end;
  726. {$IFDEF FPC}
  727. FQueueCondVar.SetEvent;
  728. {$ELSE}
  729. FQueueCondVar.ReleaseAll;
  730. {$ENDIF}
  731. end;
  732. function TThreadedQueueList<T>.PopItem: T;
  733. var
  734. LQueueSize: Integer;
  735. begin
  736. PopItem(LQueueSize, Result);
  737. end;
  738. {$IFDEF FPC}
  739. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  740. //var
  741. //crono : TChronometer;
  742. begin
  743. AItem := Default(T);
  744. //crono := TChronometer.Create(False);
  745. try
  746. Result := wrSignaled;
  747. //writeln('popitem');
  748. //crono.Start;
  749. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  750. begin
  751. //crono.Start;
  752. Result := FQueueCondVar.WaitFor(FPopTimeout);
  753. //crono.Stop;
  754. //writeln('in: ' + crono.ElapsedTime);
  755. //if result = twaitresult.wrError then result := twaitresult.wrError;
  756. end;
  757. //crono.Stop;
  758. //writeln('out: ' + crono.ElapsedTime);
  759. fQueueLock.Enter;
  760. try
  761. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  762. AItem := fQueue.Extract;
  763. Dec(FQueueSize);
  764. Inc(fTotalItemsPopped);
  765. finally
  766. fQueueLock.Leave;
  767. end;
  768. finally
  769. AQueueSize := fQueueSize;
  770. end;
  771. end;
  772. {$ELSE}
  773. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  774. begin
  775. AItem := Default(T);
  776. fQueueLock.Enter;
  777. try
  778. Result := wrSignaled;
  779. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  780. begin
  781. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  782. end;
  783. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  784. AItem := fQueue.Extract;
  785. if fQueueSize = fQueue.Count then
  786. begin
  787. FQueueCondVar.ReleaseAll;
  788. end;
  789. Dec(FQueueSize);
  790. Inc(fTotalItemsPopped);
  791. finally
  792. AQueueSize := fQueueSize;
  793. fQueueLock.Leave;
  794. end;
  795. end;
  796. {$ENDIF}
  797. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  798. var
  799. LQueueSize: Integer;
  800. begin
  801. Result := PopItem(LQueueSize, AItem);
  802. end;
  803. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  804. begin
  805. PopItem(AQueueSize, Result);
  806. end;
  807. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  808. var
  809. LQueueSize: Integer;
  810. begin
  811. Result := PushItem(AItem, LQueueSize);
  812. end;
  813. {$IFDEF FPC}
  814. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  815. begin
  816. FQueueLock.Enter;
  817. try
  818. if FQueueSize >= fQueue.Count then Grow(10);
  819. Result := wrSignaled;
  820. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  821. //begin
  822. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  823. //end;
  824. if fShutDown or (Result <> wrSignaled) then Exit;
  825. //if fQueueSize = 0 then
  826. //begin
  827. // FQueueCondVar.SetEvent;
  828. //end;
  829. fQueue.Enqueue(AItem);
  830. Inc(FQueueSize);
  831. Inc(fTotalItemsPushed);
  832. finally
  833. AQueueSize := fQueueSize;
  834. FQueueLock.Leave;
  835. //FQueueCondVar.SetEvent;
  836. end;
  837. end;
  838. {$ELSE}
  839. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  840. begin
  841. FQueueLock.Enter;
  842. try
  843. Result := wrSignaled;
  844. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  845. //begin
  846. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  847. //end;
  848. if fShutDown or (Result <> wrSignaled) then Exit;
  849. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  850. fQueue.Enqueue(AItem);
  851. Inc(FQueueSize);
  852. Inc(fTotalItemsPushed);
  853. finally
  854. AQueueSize := fQueueSize;
  855. FQueueLock.Leave;
  856. end;
  857. end;
  858. {$ENDIF}
  859. procedure TThreadedQueueList<T>.DoShutDown;
  860. begin
  861. fShutDown := True;
  862. {$IFDEF FPC}
  863. FQueueCondVar.SetEvent;
  864. {$ELSE}
  865. FQueueCondVar.ReleaseAll;
  866. {$ENDIF}
  867. end;
  868. {$IFNDEF FPC}
  869. { TThreadObjectList<T> }
  870. procedure TThreadObjectList<T>.Add(const Item: T);
  871. begin
  872. LockList;
  873. try
  874. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  875. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  876. finally
  877. UnlockList;
  878. end;
  879. end;
  880. procedure TThreadObjectList<T>.Clear;
  881. begin
  882. LockList;
  883. try
  884. fList.Clear;
  885. finally
  886. UnlockList;
  887. end;
  888. end;
  889. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  890. begin
  891. inherited Create;
  892. fLock := TObject.Create;
  893. fList := TObjectList<T>.Create;
  894. fDuplicates := dupIgnore;
  895. end;
  896. destructor TThreadObjectList<T>.Destroy;
  897. begin
  898. LockList;
  899. try
  900. fList.Free;
  901. inherited Destroy;
  902. finally
  903. UnlockList;
  904. fLock.Free;
  905. end;
  906. end;
  907. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  908. begin
  909. LockList;
  910. try
  911. Result := fList[aIndex];
  912. finally
  913. UnlockList;
  914. end;
  915. end;
  916. function TThreadObjectList<T>.LockList: TObjectList<T>;
  917. begin
  918. System.TMonitor.Enter(fLock);
  919. Result := fList;
  920. end;
  921. procedure TThreadObjectList<T>.Remove(const Item: T);
  922. begin
  923. RemoveItem(Item, TDirection.FromBeginning);
  924. end;
  925. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  926. begin
  927. LockList;
  928. try
  929. fList.RemoveItem(Item, Direction);
  930. finally
  931. UnlockList;
  932. end;
  933. end;
  934. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  935. begin
  936. LockList;
  937. try
  938. fList[aIndex] := aValue;
  939. finally
  940. UnlockList;
  941. end;
  942. end;
  943. procedure TThreadObjectList<T>.UnlockList;
  944. begin
  945. System.TMonitor.Exit(fLock);
  946. end;
  947. {$ENDIF}
  948. { TAnonymousThread }
  949. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  950. begin
  951. fThread := TAdvThread.Create(aProc,aSynchronize);
  952. end;
  953. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  954. begin
  955. Result := TAnonymousThread.Create(aProc,False);
  956. end;
  957. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  958. begin
  959. Result := TAnonymousThread.Create(aProc,True);
  960. end;
  961. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  962. begin
  963. Result := Self;
  964. fThread.OnException(aProc);
  965. end;
  966. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  967. begin
  968. Result := Self;
  969. fThread.OnTerminate(aProc,False);
  970. end;
  971. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  972. begin
  973. Result := Self;
  974. fThread.OnTerminate(aProc,True);
  975. end;
  976. procedure TAnonymousThread.Start;
  977. begin
  978. fThread.Start;
  979. end;
  980. { TTask }
  981. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  982. var
  983. i : Integer;
  984. begin
  985. fTaskStatus := TWorkTaskStatus.wtsPending;
  986. fCustomFaultPolicy := False;
  987. fNumRetries := 0;
  988. fExecuteWithSync := False;
  989. fTerminateWithSync := False;
  990. fExceptionWithSync := False;
  991. fFaultControl := TFaultControl.Create;
  992. fFaultControl.OnRetry := DoRetry;
  993. fOwnedParams := aOwnedParams;
  994. fParamList := TParamList.Create(True);
  995. for i := Low(aParamArray) to High(aParamArray) do
  996. begin
  997. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  998. {$IFDEF FPC}
  999. fParamList[i].Value._AddRef;
  1000. {$ENDIF}
  1001. end;
  1002. fExecuteProc := aTaskProc;
  1003. fEnabled := False;
  1004. end;
  1005. destructor TTask.Destroy;
  1006. begin
  1007. fFaultControl.Free;
  1008. //free passed params
  1009. fParamList.Free;
  1010. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1011. inherited;
  1012. end;
  1013. procedure TTask.DoException(aException : Exception);
  1014. begin
  1015. fTaskStatus := TWorkTaskStatus.wtsException;
  1016. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1017. else raise aException;
  1018. end;
  1019. procedure TTask.DoExecute;
  1020. begin
  1021. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1022. DoInitialize;
  1023. repeat
  1024. try
  1025. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1026. fTaskStatus := TWorkTaskStatus.wtsDone;
  1027. fFaultControl.SuccessExecution;
  1028. except
  1029. on E : Exception do
  1030. begin
  1031. fTaskStatus := TWorkTaskStatus.wtsException;
  1032. {$IFNDEF FPC}
  1033. {$IF DELPHIRX10_UP}
  1034. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1035. {$ELSE}
  1036. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1037. {$ENDIF}
  1038. {$ELSE}
  1039. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1040. {$ENDIF}
  1041. end;
  1042. end;
  1043. until not fFaultControl.NeedToRetry;
  1044. end;
  1045. procedure TTask.DoInitialize;
  1046. begin
  1047. try
  1048. fFaultControl.Reset;
  1049. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1050. except
  1051. on E : Exception do
  1052. begin
  1053. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1054. end;
  1055. end;
  1056. end;
  1057. function TTask.Done: Boolean;
  1058. begin
  1059. Result := not fFaultControl.TaskFailed;
  1060. end;
  1061. function TTask.Failed: Boolean;
  1062. begin
  1063. Result := fFaultControl.TaskFailed;
  1064. end;
  1065. function TTask.CircuitBreaked: Boolean;
  1066. begin
  1067. Result := fFaultControl.CircuitBreaked;
  1068. end;
  1069. function TTask.LastException: Exception;
  1070. begin
  1071. Result := fFaultControl.LastException;
  1072. end;
  1073. function TTask.MaxRetries: Integer;
  1074. begin
  1075. Result := fFaultControl.MaxRetries;
  1076. end;
  1077. function TTask.NumRetries: Integer;
  1078. begin
  1079. Result := fFaultControl.NumRetries;
  1080. end;
  1081. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1082. begin
  1083. vStopRetries := False;
  1084. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1085. end;
  1086. procedure TTask.DoTerminate;
  1087. begin
  1088. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1089. end;
  1090. function TTask.GetIdTask: Int64;
  1091. begin
  1092. Result := fIdTask;
  1093. end;
  1094. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1095. begin
  1096. {$IFDEF FPC}
  1097. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1098. {$ENDIF}
  1099. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1100. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1101. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1102. end;
  1103. procedure TTask.SetIdTask(Value : Int64);
  1104. begin
  1105. fIdTask := Value;
  1106. end;
  1107. function TTask.GetNumWorker: Integer;
  1108. begin
  1109. Result := fNumWorker;
  1110. end;
  1111. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1112. begin
  1113. Result := fParamList[aIndex].Value;
  1114. end;
  1115. function TTask.GetParam(const aName: string): TFlexValue;
  1116. var
  1117. paramvalue : TParamValue;
  1118. begin
  1119. for paramvalue in fParamList do
  1120. begin
  1121. if CompareText(paramvalue.Name,aName) = 0 then
  1122. begin
  1123. Exit(paramvalue.Value)
  1124. end;
  1125. end;
  1126. //if not exists
  1127. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1128. end;
  1129. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1130. begin
  1131. Result := @fParamList[aIndex].Value;
  1132. end;
  1133. function TTask.GetResult: TFlexValue;
  1134. begin
  1135. Result := fResult;
  1136. end;
  1137. function TTask.IsEnabled: Boolean;
  1138. begin
  1139. Result := fEnabled;
  1140. end;
  1141. procedure TTask.SetNumWorker(Value: Integer);
  1142. begin
  1143. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1144. fNumWorker := Value;
  1145. end;
  1146. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1147. begin
  1148. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1149. fParamList[aIndex].Value := Value;
  1150. end;
  1151. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1152. var
  1153. paramvalue : TParamValue;
  1154. begin
  1155. //check if already exists parameter
  1156. for paramvalue in fParamList do
  1157. begin
  1158. if CompareText(paramvalue.Name,aName) = 0 then
  1159. begin
  1160. paramvalue.Value := Value;
  1161. Exit;
  1162. end;
  1163. end;
  1164. //if not exists, create one
  1165. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1166. end;
  1167. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1168. begin
  1169. SetParam(aName,Value,False);
  1170. end;
  1171. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1172. begin
  1173. if aWaitTimeMultiplierFactor = 0 then raise ETaskParamError.Create('WaitTimeMultiplierFactor cannot be 0');
  1174. fFaultControl.MaxRetries := aMaxRetries;
  1175. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1176. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1177. fCustomFaultPolicy := True;
  1178. end;
  1179. procedure TTask.SetResult(aValue: TFlexValue);
  1180. begin
  1181. fResult := aValue;
  1182. end;
  1183. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1184. begin
  1185. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1186. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1187. fFaultControl.WaitTimeMultiplierFactor := 1;
  1188. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1189. fCustomFaultPolicy := True;
  1190. end;
  1191. function TTask.TaskStatus: TWorkTaskStatus;
  1192. begin
  1193. Result := fTaskStatus;
  1194. end;
  1195. { TWorkTask }
  1196. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1197. begin
  1198. fExceptProc := aTaskProc;
  1199. Result := Self;
  1200. end;
  1201. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1202. begin
  1203. fInitializeProc := aTaskProc;
  1204. Result := Self;
  1205. end;
  1206. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1207. begin
  1208. fRetryProc := aTaskProc;
  1209. Result := Self;
  1210. end;
  1211. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1212. begin
  1213. fTerminateProc := aTaskProc;
  1214. Result := Self;
  1215. end;
  1216. procedure TWorkTask.Run;
  1217. begin
  1218. fEnabled := True;
  1219. end;
  1220. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1221. begin
  1222. Result := Self;
  1223. SetParam(aName,aValue);
  1224. end;
  1225. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1226. begin
  1227. Result := Self;
  1228. SetParam(aName,aValue,aOwned);
  1229. end;
  1230. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1231. begin
  1232. Result := Self;
  1233. SetRetryPolicy(aMaxRetries,0,1);
  1234. end;
  1235. function TWorkTask.RetryForever: IWorkTask;
  1236. begin
  1237. Result := Self;
  1238. SetRetryPolicy(-1,0,1);
  1239. end;
  1240. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1241. begin
  1242. Result := Self;
  1243. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1244. end;
  1245. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1246. begin
  1247. Result := Self;
  1248. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1249. end;
  1250. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1251. begin
  1252. Result := Self;
  1253. SetRetryPolicy(aWaitTimeArray);
  1254. end;
  1255. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1256. begin
  1257. Result := Self;
  1258. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1259. end;
  1260. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1261. begin
  1262. Result := Self;
  1263. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1264. end;
  1265. { TBackgroundTasks }
  1266. procedure TBackgroundTasks.CancelAll;
  1267. begin
  1268. fTaskQueue.Clear;
  1269. end;
  1270. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1271. begin
  1272. fMaxQueue := 0;
  1273. fConcurrentWorkers := aConcurrentWorkers;
  1274. fInsertTimeout := INFINITE;
  1275. fExtractTimeout := 5000;
  1276. fNumPushedTasks := 0;
  1277. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1278. end;
  1279. destructor TBackgroundTasks.Destroy;
  1280. begin
  1281. CancelAll;
  1282. fTaskQueue.DoShutDown;
  1283. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1284. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1285. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1286. inherited;
  1287. end;
  1288. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1289. begin
  1290. Result := fTaskQueue.QueueSize;
  1291. end;
  1292. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1293. begin
  1294. Result := AddTask([],False,aTaskProc);
  1295. end;
  1296. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1297. var
  1298. worktask : IWorkTask;
  1299. begin
  1300. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1301. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1302. Inc(fNumPushedTasks);
  1303. worktask.SetIdTask(fNumPushedTasks);
  1304. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1305. begin
  1306. Result := worktask;
  1307. end;
  1308. end;
  1309. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1310. begin
  1311. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1312. TTask(Result).ExecuteWithSync := True;
  1313. end;
  1314. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1315. begin
  1316. Result := AddTask_Sync([],False,aTaskProc);
  1317. end;
  1318. procedure TBackgroundTasks.Start;
  1319. var
  1320. i : Integer;
  1321. worker : TWorker;
  1322. begin
  1323. //create workers
  1324. if fWorkerPool <> nil then fWorkerPool.Free;
  1325. fWorkerPool := TWorkerPool.Create(True);
  1326. for i := 1 to fConcurrentWorkers do
  1327. begin
  1328. worker := TQueueWorker.Create(i,fTaskQueue);
  1329. fWorkerPool.Add(worker);
  1330. worker.Start;
  1331. end;
  1332. end;
  1333. { TWorker }
  1334. constructor TWorker.Create;
  1335. begin
  1336. inherited Create(True);
  1337. fDefaultFaultPolicy := TFaultPolicy.Create;
  1338. fStatus := TWorkerStatus.wsSuspended;
  1339. FreeOnTerminate := False;
  1340. end;
  1341. destructor TWorker.Destroy;
  1342. begin
  1343. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1344. inherited;
  1345. end;
  1346. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1347. begin
  1348. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1349. end;
  1350. procedure TWorker.Execute;
  1351. begin
  1352. end;
  1353. procedure TWorker.ExecuteTask;
  1354. begin
  1355. fCurrentTask.DoExecute;
  1356. end;
  1357. procedure TWorker.TerminateTask;
  1358. begin
  1359. fCurrentTask.DoTerminate;
  1360. end;
  1361. { TSimpleWorker }
  1362. constructor TSimpleWorker.Create(aTask : ITask);
  1363. begin
  1364. inherited Create;
  1365. fCurrentTask := aTask;
  1366. FreeOnTerminate := True;
  1367. end;
  1368. procedure TSimpleWorker.Execute;
  1369. begin
  1370. fStatus := TWorkerStatus.wsIdle;
  1371. while not Terminated do
  1372. begin
  1373. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1374. try
  1375. fStatus := TWorkerStatus.wsWorking;
  1376. try
  1377. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1378. else fCurrentTask.DoExecute;
  1379. except
  1380. on E : Exception do
  1381. begin
  1382. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1383. else raise ETaskExecutionError.Create(e.Message);
  1384. end;
  1385. end;
  1386. finally
  1387. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1388. else fCurrentTask.DoTerminate;
  1389. fStatus := TWorkerStatus.wsIdle;
  1390. Terminate;
  1391. end;
  1392. end;
  1393. fStatus := TWorkerStatus.wsSuspended
  1394. end;
  1395. { TQueueWorker }
  1396. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1397. begin
  1398. inherited Create;
  1399. fNumWorker := aNumWorker;
  1400. fTaskQueue := aTaskQueue;
  1401. end;
  1402. procedure TQueueWorker.Execute;
  1403. begin
  1404. fStatus := TWorkerStatus.wsIdle;
  1405. while not Terminated do
  1406. begin
  1407. fCurrentTask := fTaskQueue.PopItem;
  1408. if fCurrentTask <> nil then
  1409. try
  1410. fStatus := TWorkerStatus.wsWorking;
  1411. try
  1412. fCurrentIdTask := fCurrentTask.GetIdTask;
  1413. SetFaultPolicy(TTask(fCurrentTask));
  1414. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1415. else fCurrentTask.DoExecute;
  1416. except
  1417. on E : Exception do
  1418. begin
  1419. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1420. else raise ETaskExecutionError.Create(e.Message);
  1421. end;
  1422. end;
  1423. finally
  1424. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1425. else fCurrentTask.DoTerminate;
  1426. fStatus := TWorkerStatus.wsIdle;
  1427. end;
  1428. end;
  1429. fStatus := TWorkerStatus.wsSuspended
  1430. end;
  1431. { TScheduledWorker }
  1432. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1433. begin
  1434. inherited Create;
  1435. {$IFNDEF DELPHILINUX}
  1436. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1437. {$ENDIF}
  1438. FreeOnTerminate := True;
  1439. fCurrentTask := aScheduledTask;
  1440. end;
  1441. procedure TScheduledWorker.Execute;
  1442. begin
  1443. fStatus := TWorkerStatus.wsIdle;
  1444. if Assigned(fCurrentTask) then
  1445. begin
  1446. try
  1447. fStatus := TWorkerStatus.wsWorking;
  1448. try
  1449. SetFaultPolicy(TTask(fCurrentTask));
  1450. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1451. else fCurrentTask.DoExecute;
  1452. fStatus := TWorkerStatus.wsIdle;
  1453. except
  1454. on E : Exception do
  1455. begin
  1456. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1457. else raise ETaskExecutionError.Create(e.Message);
  1458. end;
  1459. end;
  1460. finally
  1461. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1462. else fCurrentTask.DoTerminate;
  1463. //check if expired
  1464. if (fCurrentTask as IScheduledTask).IsFinished then
  1465. begin
  1466. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1467. else (fCurrentTask as IScheduledTask).DoExpire;
  1468. end;
  1469. end;
  1470. end;
  1471. fCurrentTask := nil;
  1472. fStatus := TWorkerStatus.wsSuspended;
  1473. end;
  1474. procedure TScheduledWorker.ExpireTask;
  1475. begin
  1476. (fCurrentTask as IScheduledTask).DoExpire;
  1477. end;
  1478. { TScheduledTasks }
  1479. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1480. begin
  1481. Result := AddTask(aTaskName,[],False,aTaskProc);
  1482. end;
  1483. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1484. var
  1485. scheduletask : TScheduledTask;
  1486. begin
  1487. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1488. scheduletask.Name := aTaskName;
  1489. Inc(fNumPushedTasks);
  1490. scheduletask.SetIdTask(fNumPushedTasks);
  1491. fTaskList.Add(scheduletask);
  1492. Result := scheduletask;
  1493. end;
  1494. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1495. begin
  1496. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1497. TTask(Result).ExecuteWithSync := True;
  1498. end;
  1499. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1500. begin
  1501. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1502. end;
  1503. constructor TScheduledTasks.Create;
  1504. begin
  1505. fNumPushedTasks := 0;
  1506. fIsStarted := False;
  1507. fFaultPolicy := TFaultPolicy.Create;
  1508. fTaskList := TScheduledTaskList.Create;
  1509. end;
  1510. destructor TScheduledTasks.Destroy;
  1511. begin
  1512. if Assigned(fScheduler) then
  1513. begin
  1514. fScheduler.Terminate;
  1515. fScheduler.WaitFor;
  1516. fScheduler.Free;
  1517. end;
  1518. if Assigned(fTaskList) then fTaskList.Free;
  1519. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1520. inherited;
  1521. end;
  1522. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1523. begin
  1524. Result := fScheduler.Get(aIdTask);
  1525. end;
  1526. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1527. begin
  1528. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1529. Result := fScheduler.Get(aTaskName);
  1530. end;
  1531. procedure TScheduledTasks.Start;
  1532. begin
  1533. if fIsStarted then Exit;
  1534. if not Assigned(fScheduler) then
  1535. begin
  1536. fScheduler := TScheduler.Create(fTaskList);
  1537. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1538. end;
  1539. fScheduler.Start;
  1540. fIsStarted := True;
  1541. end;
  1542. procedure TScheduledTasks.Stop;
  1543. begin
  1544. if Assigned(fScheduler) then fScheduler.Terminate;
  1545. fIsStarted := False;
  1546. end;
  1547. { TScheduledTask }
  1548. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1549. begin
  1550. Result := Self;
  1551. SetParam(aName,aValue);
  1552. end;
  1553. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1554. begin
  1555. Result := Self;
  1556. SetParam(aName,aValue);
  1557. end;
  1558. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1559. begin
  1560. Result := Self;
  1561. ClearSchedule;
  1562. fScheduleMode := TScheduleMode.smRunOnce;
  1563. fStartDate := aStartDate;
  1564. fNextExecution := aStartDate;
  1565. end;
  1566. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1567. begin
  1568. Result := Self;
  1569. ClearSchedule;
  1570. fScheduleMode := TScheduleMode.smRunOnce;
  1571. fStartDate := IncMinute(Now(),aMinutes);
  1572. fNextExecution := fStartDate;
  1573. end;
  1574. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1575. begin
  1576. Result := Self;
  1577. ClearSchedule;
  1578. fScheduleMode := TScheduleMode.smRunOnce;
  1579. fStartDate := IncSecond(Now(),aSeconds);
  1580. fNextExecution := fStartDate;
  1581. end;
  1582. function TScheduledTask.StartNow: IScheduledTask;
  1583. begin
  1584. Result := Self;
  1585. ClearSchedule;
  1586. fScheduleMode := TScheduleMode.smRunOnce;
  1587. fStartDate := Now();
  1588. fNextExecution := fStartDate;
  1589. end;
  1590. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1591. begin
  1592. Result := Self;
  1593. ClearSchedule;
  1594. fScheduleMode := TScheduleMode.smRunOnce;
  1595. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1596. fNextExecution := fStartDate;
  1597. end;
  1598. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1599. begin
  1600. Result := Self;
  1601. ClearSchedule;
  1602. fScheduleMode := TScheduleMode.smRunOnce;
  1603. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1604. fNextExecution := fStartDate;
  1605. end;
  1606. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1607. begin
  1608. Result := Self;
  1609. ClearSchedule;
  1610. fScheduleMode := TScheduleMode.smRunOnce;
  1611. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1612. fNextExecution := fStartDate;
  1613. end;
  1614. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1615. begin
  1616. Result := Self;
  1617. SetRetryPolicy(aMaxRetries,0,1);
  1618. end;
  1619. function TScheduledTask.RetryForever: IScheduledTask;
  1620. begin
  1621. Result := Self;
  1622. SetRetryPolicy(-1,0,1);
  1623. end;
  1624. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1625. begin
  1626. Result := Self;
  1627. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1628. end;
  1629. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1630. begin
  1631. Result := Self;
  1632. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1633. end;
  1634. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1635. begin
  1636. Result := Self;
  1637. SetRetryPolicy(aWaitTimeArray);
  1638. end;
  1639. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1640. begin
  1641. Result := Self;
  1642. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1643. end;
  1644. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1645. begin
  1646. Result := Self;
  1647. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1648. end;
  1649. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1650. begin
  1651. if fStartDate = 0.0 then ClearSchedule;
  1652. fScheduleMode := TScheduleMode.smRepeatMode;
  1653. fTimeMeasure := aTimeMeasure;
  1654. fTimeInterval := aInterval;
  1655. if fStartDate < Now() then fStartDate := Now();
  1656. fNextExecution := fStartDate;
  1657. fEnabled := True;
  1658. end;
  1659. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1660. begin
  1661. if fStartDate = 0.0 then ClearSchedule;
  1662. fScheduleMode := TScheduleMode.smRepeatMode;
  1663. fTimeMeasure := aTimeMeasure;
  1664. fTimeInterval := aInterval;
  1665. if fStartDate < Now() then fStartDate := Now();
  1666. fExpirationDate := aEndTime;
  1667. fNextExecution := fStartDate;
  1668. fEnabled := True;
  1669. end;
  1670. procedure TScheduledTask.RepeatEveryDay;
  1671. begin
  1672. RepeatEvery(1,tmDays);
  1673. end;
  1674. procedure TScheduledTask.RepeatEveryWeek;
  1675. begin
  1676. RepeatEvery(7,tmDays);
  1677. end;
  1678. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1679. begin
  1680. if fStartDate = 0.0 then ClearSchedule;
  1681. fScheduleMode := TScheduleMode.smRepeatMode;
  1682. fTimeMeasure := aTimeMeasure;
  1683. fTimeInterval := aInterval;
  1684. if fStartDate < Now() then fStartDate := Now();
  1685. fExpirationTimes := aRepeatTimes;
  1686. fNextExecution := fStartDate;
  1687. fEnabled := True;
  1688. end;
  1689. procedure TScheduledTask.RunOnce;
  1690. begin
  1691. fScheduleMode := TScheduleMode.smRunOnce;
  1692. if fStartDate < Now() then fStartDate := Now();
  1693. fNextExecution := fStartDate;
  1694. fEnabled := True;
  1695. end;
  1696. procedure TScheduledTask.Cancel;
  1697. begin
  1698. fFinished := True;
  1699. end;
  1700. function TScheduledTask.CheckSchedule: Boolean;
  1701. begin
  1702. Result := False;
  1703. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1704. if fScheduleMode = TScheduleMode.smRunOnce then
  1705. begin
  1706. //if start date reached
  1707. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1708. begin
  1709. fLastExecution := Now();
  1710. Inc(fExecutionTimes);
  1711. fFinished := True;
  1712. Result := True;
  1713. end;
  1714. end
  1715. else
  1716. begin
  1717. //if next execution reached
  1718. if Now() >= fNextExecution then
  1719. begin
  1720. //check expiration limits
  1721. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1722. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1723. begin
  1724. fFinished := True;
  1725. Exit;
  1726. end;
  1727. //calculate next execution
  1728. case fTimeMeasure of
  1729. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1730. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1731. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1732. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1733. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1734. end;
  1735. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1736. else
  1737. begin
  1738. fLastExecution := Now();
  1739. Inc(fExecutionTimes);
  1740. Result := True;
  1741. end;
  1742. end;
  1743. end;
  1744. end;
  1745. procedure TScheduledTask.ClearSchedule;
  1746. begin
  1747. inherited;
  1748. fFinished := False;
  1749. fStartDate := 0.0;
  1750. fLastExecution := 0.0;
  1751. fNextExecution := 0.0;
  1752. fExpirationDate := 0.0;
  1753. fScheduleMode := TScheduleMode.smRunOnce;
  1754. fTimeMeasure := TTimeMeasure.tmSeconds;
  1755. fTimeInterval := 0;
  1756. end;
  1757. procedure TScheduledTask.DoExpire;
  1758. begin
  1759. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1760. fEnabled := False;
  1761. end;
  1762. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1763. begin
  1764. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1765. end;
  1766. function TScheduledTask.GetTaskName: string;
  1767. begin
  1768. Result := fName;
  1769. end;
  1770. function TScheduledTask.IsFinished: Boolean;
  1771. begin
  1772. Result := fFinished;
  1773. end;
  1774. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1775. begin
  1776. fExceptProc := aTaskProc;
  1777. Result := Self;
  1778. end;
  1779. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1780. begin
  1781. Result := OnException(aTaskProc);
  1782. TTask(Result).ExceptionWithSync := True;
  1783. end;
  1784. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1785. begin
  1786. fRetryProc := aTaskProc;
  1787. Result := Self;
  1788. end;
  1789. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1790. begin
  1791. fExpiredProc := aTaskProc;
  1792. Result := Self;
  1793. end;
  1794. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1795. begin
  1796. Result := OnExpired(aTaskProc);
  1797. TScheduledTask(Result).ExpireWithSync := True;
  1798. end;
  1799. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1800. begin
  1801. fInitializeProc := aTaskProc;
  1802. Result := Self;
  1803. end;
  1804. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1805. begin
  1806. fTerminateProc := aTaskProc;
  1807. Result := Self;
  1808. end;
  1809. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1810. begin
  1811. Result := OnTerminated(aTaskProc);
  1812. TTask(Result).TerminateWithSync := True;
  1813. end;
  1814. { TScheduler }
  1815. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1816. begin
  1817. inherited Create(True);
  1818. FreeOnTerminate := False;
  1819. fListLock := TCriticalSection.Create;
  1820. fRemoveTaskAfterExpiration := False;
  1821. fTaskList := aTaskList;
  1822. {$IFDEF FPC}
  1823. fCondVar := TSimpleEvent.Create;
  1824. {$ELSE}
  1825. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1826. {$ENDIF}
  1827. end;
  1828. destructor TScheduler.Destroy;
  1829. begin
  1830. fCondVar.SetEvent;
  1831. fCondVar.Free;
  1832. fTaskList := nil;
  1833. fListLock.Free;
  1834. inherited;
  1835. end;
  1836. procedure TScheduler.Execute;
  1837. var
  1838. task : IScheduledTask;
  1839. worker : TScheduledWorker;
  1840. numworker : Int64;
  1841. begin
  1842. numworker := 0;
  1843. while not Terminated do
  1844. begin
  1845. fListLock.Enter;
  1846. try
  1847. for task in fTaskList do
  1848. begin
  1849. if (task.IsEnabled) and (not task.IsFinished) then
  1850. begin
  1851. if task.CheckSchedule then
  1852. begin
  1853. Inc(numworker);
  1854. worker := TScheduledWorker.Create(numworker,task);
  1855. worker.Start;
  1856. end;
  1857. end
  1858. else
  1859. begin
  1860. if task.IsEnabled then
  1861. begin
  1862. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1863. // else task.DoExpire;
  1864. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1865. end;
  1866. end;
  1867. end;
  1868. task := nil;
  1869. finally
  1870. fListLock.Leave;
  1871. end;
  1872. fCondVar.WaitFor(250);
  1873. end;
  1874. end;
  1875. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1876. begin
  1877. Result := fTaskList.Add(aTask);
  1878. end;
  1879. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1880. var
  1881. task : IScheduledTask;
  1882. begin
  1883. fListLock.Enter;
  1884. try
  1885. for task in fTaskList do
  1886. begin
  1887. if task.IdTask = aIdTask then Exit(task);
  1888. end;
  1889. finally
  1890. fListLock.Leave;
  1891. end;
  1892. end;
  1893. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1894. var
  1895. task : IScheduledTask;
  1896. begin
  1897. fListLock.Enter;
  1898. try
  1899. for task in fTaskList do
  1900. begin
  1901. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1902. end;
  1903. finally
  1904. fListLock.Leave;
  1905. end;
  1906. end;
  1907. { TAdvThread }
  1908. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1909. begin
  1910. inherited Create(True);
  1911. FreeOnTerminate := True;
  1912. fExecuteWithSync := aSynchronize;
  1913. fExecuteProc := aProc;
  1914. end;
  1915. procedure TAdvThread.DoExecute;
  1916. begin
  1917. try
  1918. if Assigned(fExecuteProc) then fExecuteProc;
  1919. except
  1920. on E : Exception do
  1921. begin
  1922. {$IFNDEF FPC}
  1923. {$IF DELPHIRX10_UP}
  1924. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1925. {$ELSE}
  1926. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1927. {$ENDIF}
  1928. {$ELSE}
  1929. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1930. {$ENDIF}
  1931. else raise e;
  1932. end;
  1933. end;
  1934. end;
  1935. procedure TAdvThread.CallToTerminate;
  1936. begin
  1937. if Assigned(fTerminateProc) then fTerminateProc;
  1938. end;
  1939. procedure TAdvThread.DoTerminate;
  1940. begin
  1941. if fTerminateWithSync then Synchronize(CallToTerminate)
  1942. else CallToTerminate;
  1943. end;
  1944. procedure TAdvThread.Execute;
  1945. begin
  1946. if fExecuteWithSync then Synchronize(DoExecute)
  1947. else DoExecute;
  1948. end;
  1949. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1950. begin
  1951. fExceptionProc := aProc;
  1952. end;
  1953. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1954. begin
  1955. fTerminateWithSync := aSynchronize;
  1956. fTerminateProc := aProc;
  1957. end;
  1958. { TRunTask }
  1959. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  1960. begin
  1961. Result := Execute([],False,aTaskProc);
  1962. end;
  1963. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  1964. begin
  1965. Result := Execute_Sync([],False,aTaskProc);
  1966. end;
  1967. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1968. var
  1969. task : TWorkTask;
  1970. worker : TSimpleWorker;
  1971. begin
  1972. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1973. task.ExecuteWithSync := False;
  1974. Result := task;
  1975. worker := TSimpleWorker.Create(task);
  1976. worker.Start;
  1977. end;
  1978. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1979. var
  1980. task : TWorkTask;
  1981. worker : TSimpleWorker;
  1982. begin
  1983. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1984. task.ExecuteWithSync := True;
  1985. Result := task;
  1986. worker := TSimpleWorker.Create(task);
  1987. worker.Start;
  1988. end;
  1989. { TParamValue }
  1990. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  1991. begin
  1992. inherited Create;
  1993. fName := aName;
  1994. fValue := aValue;
  1995. fOwned := aOwnedValue;
  1996. end;
  1997. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  1998. begin
  1999. inherited Create;
  2000. fName := aName;
  2001. fValue := aValue;
  2002. fOwned := aOwnedValue;
  2003. end;
  2004. constructor TParamValue.Create;
  2005. begin
  2006. fName := '';
  2007. fOwned := False;
  2008. end;
  2009. destructor TParamValue.Destroy;
  2010. begin
  2011. {$IFDEF FPC}
  2012. fValue._Release;
  2013. {$ENDIF}
  2014. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2015. inherited;
  2016. end;
  2017. end.