2
0

Quick.Threads.pas 64 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275
  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike P�rez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike P�rez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 22/08/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskInitializationError = class(Exception);
  187. ETaskExecutionError = class(Exception);
  188. ETaskParamError = class(Exception);
  189. ETaskSchedulerError = class(Exception);
  190. ITask = interface
  191. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  192. function GetParam(aIndex : Integer) : TFlexValue; overload;
  193. function GetParam(const aName : string) : TFlexValue; overload;
  194. function GetParam2(aIndex : Integer) : PFlexValue;
  195. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  196. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  197. function TaskStatus : TWorkTaskStatus;
  198. function GetNumWorker : Integer;
  199. procedure SetNumWorker(Value : Integer);
  200. function GetIdTask : Int64;
  201. procedure SetIdTask(Value : Int64);
  202. function GetResult : TFlexValue;
  203. procedure SetResult(aValue : TFlexValue);
  204. procedure DoExecute;
  205. procedure DoException(aException : Exception);
  206. procedure DoTerminate;
  207. {$IFNDEF FPC}
  208. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  209. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  210. {$ELSE}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  212. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ENDIF}
  214. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  215. property Result : TFlexValue read GetResult write SetResult;
  216. property IdTask : Int64 read GetIdTask;
  217. function Done : Boolean;
  218. function Failed : Boolean;
  219. function NumRetries : Integer;
  220. function MaxRetries : Integer;
  221. function LastException : Exception;
  222. function CircuitBreaked : Boolean;
  223. function IsEnabled : Boolean;
  224. end;
  225. {$IFNDEF FPC}
  226. TTaskProc = reference to procedure(task : ITask);
  227. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  228. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  229. {$ELSE}
  230. TTaskProc = procedure(task : ITask) of object;
  231. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  232. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  233. {$ENDIF}
  234. IWorkTask = interface(ITask)
  235. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  236. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  237. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  238. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  239. function Retry(aMaxRetries : Integer) : IWorkTask;
  240. function RetryForever : IWorkTask;
  241. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  242. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  243. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  244. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  245. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  246. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  247. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  248. procedure Run;
  249. end;
  250. IScheduledTask = interface(ITask)
  251. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  252. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  253. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  254. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  255. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  256. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  257. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  259. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  260. function Retry(aMaxRetries : Integer) : IScheduledTask;
  261. function RetryForever : IScheduledTask;
  262. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  263. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  264. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  265. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  266. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  267. function CheckSchedule : Boolean;
  268. procedure DoExpire;
  269. function GetTaskName : string;
  270. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  271. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  272. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  273. function StartOnDayChange : IScheduledTask;
  274. function StartNow : IScheduledTask;
  275. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  276. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  277. procedure RunOnce;
  278. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  279. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  280. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  281. procedure RepeatEveryDay;
  282. procedure RepeatEveryWeek;
  283. function IsFinished : Boolean;
  284. procedure Cancel;
  285. property Name : string read GetTaskName;
  286. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  287. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  288. end;
  289. TTask = class(TInterfacedObject,ITask)
  290. private
  291. fIdTask : Int64;
  292. fNumWorker : Integer;
  293. fNumRetries : Integer;
  294. fParamList : TParamList;
  295. fInitializeProc : TTaskProc;
  296. fExecuteProc : TTaskProc;
  297. fExceptProc : TTaskExceptionProc;
  298. fTerminateProc : TTaskProc;
  299. fExpiredProc : TTaskProc;
  300. fTaskStatus : TWorkTaskStatus;
  301. fOwnedParams : Boolean;
  302. fEnabled : Boolean;
  303. fExecuteWithSync : Boolean;
  304. fExceptionWithSync : Boolean;
  305. fRetryProc : TTaskRetryProc;
  306. fTerminateWithSync : Boolean;
  307. fFaultControl : TFaultControl;
  308. fCustomFaultPolicy : Boolean;
  309. fResult : TFlexValue;
  310. function GetParam(aIndex : Integer) : TFlexValue; overload;
  311. function GetParam(const aName : string) : TFlexValue; overload;
  312. function GetParam2(aIndex : Integer) : PFlexValue;
  313. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  314. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  315. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  316. procedure DoInitialize;
  317. procedure DoExecute;
  318. procedure DoException(aException : Exception);
  319. procedure DoTerminate;
  320. function GetNumWorker : Integer;
  321. procedure SetNumWorker(Value : Integer);
  322. function GetIdTask : Int64;
  323. procedure SetIdTask(Value : Int64);
  324. function GetResult : TFlexValue;
  325. procedure SetResult(aValue : TFlexValue);
  326. protected
  327. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  328. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  329. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  330. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  331. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  332. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  333. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  334. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  335. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  336. public
  337. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  338. destructor Destroy; override;
  339. property IdTask : Int64 read GetIdTask;
  340. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  341. function IsEnabled : Boolean;
  342. function TaskStatus : TWorkTaskStatus;
  343. function Done : Boolean;
  344. function Failed : Boolean;
  345. function NumRetries : Integer;
  346. function MaxRetries : Integer;
  347. function LastException : Exception;
  348. function CircuitBreaked : Boolean;
  349. end;
  350. TWorkTask = class(TTask,IWorkTask)
  351. public
  352. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  353. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  354. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  355. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  356. function Retry(aMaxRetries : Integer) : IWorkTask;
  357. function RetryForever : IWorkTask;
  358. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  359. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  360. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  361. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  362. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  363. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  364. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  365. procedure Run; virtual;
  366. end;
  367. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  368. TScheduledTask = class(TTask,IScheduledTask)
  369. private
  370. fName : string;
  371. fExecutionTimes : Integer;
  372. fScheduleMode : TScheduleMode;
  373. fTimeInterval : Integer;
  374. fTimeMeasure : TTimeMeasure;
  375. fStartDate : TDateTime;
  376. fLastExecution : TDateTime;
  377. fNextExecution : TDateTime;
  378. fExpirationDate : TDateTime;
  379. fExpirationTimes : Integer;
  380. fFinished : Boolean;
  381. fExpireWithSync: Boolean;
  382. procedure ClearSchedule;
  383. function CheckSchedule : Boolean;
  384. procedure DoExpire;
  385. function GetTaskName : string;
  386. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  387. protected
  388. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  389. public
  390. property Name : string read fName write fName;
  391. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  392. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  393. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  394. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  395. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  396. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  397. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  398. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  399. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  400. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  401. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  402. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  403. function StartOnDayChange : IScheduledTask;
  404. function StartNow : IScheduledTask;
  405. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  406. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  407. procedure RunOnce;
  408. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  409. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  410. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  411. procedure RepeatEveryDay;
  412. procedure RepeatEveryWeek;
  413. function Retry(aMaxRetries : Integer) : IScheduledTask;
  414. function RetryForever : IScheduledTask;
  415. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  416. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  417. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  418. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  419. function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  420. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  421. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  422. function IsFinished : Boolean;
  423. procedure Cancel;
  424. end;
  425. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  426. TWorker = class(TThread)
  427. protected
  428. fStatus : TWorkerStatus;
  429. fCurrentTask : ITask;
  430. fDefaultFaultPolicy : TFaultPolicy;
  431. procedure ExecuteTask;
  432. procedure TerminateTask;
  433. public
  434. constructor Create;
  435. destructor Destroy; override;
  436. property Status : TWorkerStatus read fStatus;
  437. procedure SetFaultPolicy(aTask : TTask);
  438. procedure Execute; override;
  439. end;
  440. TSimpleWorker = class(TWorker)
  441. public
  442. constructor Create(aTask : ITask);
  443. procedure Execute; override;
  444. end;
  445. TQueueWorker = class(TWorker)
  446. private
  447. fCurrentIdTask : Integer;
  448. fNumWorker : Integer;
  449. fTaskQueue : TTaskQueue;
  450. public
  451. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  452. property NumWorker : Integer read fNumWorker;
  453. procedure Execute; override;
  454. end;
  455. TScheduledWorker = class(TWorker)
  456. private
  457. procedure ExpireTask;
  458. public
  459. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  460. procedure Execute; override;
  461. end;
  462. TWorkerPool = TObjectList<TWorker>;
  463. TRunTask = class
  464. public
  465. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  466. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  467. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  468. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  469. end;
  470. TBackgroundTasks = class
  471. private
  472. fMaxQueue : Integer;
  473. fWorkerPool : TWorkerPool;
  474. fConcurrentWorkers : Integer;
  475. fInsertTimeout : Cardinal;
  476. fExtractTimeout : Cardinal;
  477. fTaskQueue : TTaskQueue;
  478. fNumPushedTasks : Int64;
  479. function GetTaskQueue : Cardinal;
  480. public
  481. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  482. destructor Destroy; override;
  483. property MaxQueue : Integer read fMaxQueue;
  484. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  485. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  486. property TaskQueued : Cardinal read GetTaskQueue;
  487. property NumPushedTasks : Int64 read fNumPushedTasks;
  488. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  489. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  490. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  491. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  492. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  493. procedure Start;
  494. procedure CancelAll;
  495. end;
  496. TScheduledTaskList = TList<IScheduledTask>;
  497. TScheduler = class(TThread)
  498. private
  499. fListLock : TCriticalSection;
  500. fCondVar : TSimpleEvent;
  501. fTaskList : TScheduledTaskList;
  502. fRemoveTaskAfterExpiration : Boolean;
  503. public
  504. constructor Create(aTaskList : TScheduledTaskList);
  505. destructor Destroy; override;
  506. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  507. procedure Execute; override;
  508. function Add(aTask : TScheduledTask) : Integer;
  509. function Get(aIdTask : Int64) : IScheduledTask; overload;
  510. function Get(const aTaskName : string) : IScheduledTask; overload;
  511. end;
  512. TScheduledTasks = class
  513. private
  514. fTaskList : TScheduledTaskList;
  515. fScheduler : TScheduler;
  516. fNumPushedTasks : Int64;
  517. fRemoveTaskAfterExpiration : Boolean;
  518. fIsStarted : Boolean;
  519. fFaultPolicy : TFaultPolicy;
  520. public
  521. constructor Create;
  522. destructor Destroy; override;
  523. property NumPushedTasks : Int64 read fNumPushedTasks;
  524. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  525. property IsStarted : Boolean read fIsStarted;
  526. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  527. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  528. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  529. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  530. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  531. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  532. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  533. procedure Start;
  534. procedure Stop;
  535. end;
  536. implementation
  537. { TThreadedQueueCS<T> }
  538. procedure TThreadedQueueCS<T>.Clear;
  539. var
  540. obj : T;
  541. begin
  542. FQueueLock.Enter;
  543. try
  544. for obj in FQueue do
  545. begin
  546. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  547. end;
  548. SetLength(FQueue,0);
  549. finally
  550. FQueueLock.Leave;
  551. end;
  552. {$IFDEF FPC}
  553. FQueueCondVar.SetEvent;
  554. {$ELSE}
  555. FQueueCondVar.ReleaseAll;
  556. {$ENDIF}
  557. end;
  558. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  559. begin
  560. inherited Create;
  561. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  562. SetLength(FQueue, AQueueDepth);
  563. FQueueLock := TCriticalSection.Create;
  564. {$IFDEF FPC}
  565. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  566. {$ELSE}
  567. FQueueCondVar := TConditionVariableCS.Create;
  568. {$ENDIF}
  569. FPushTimeout := PushTimeout;
  570. FPopTimeout := PopTimeout;
  571. end;
  572. destructor TThreadedQueueCS<T>.Destroy;
  573. begin
  574. DoShutDown;
  575. FQueueLock.Free;
  576. FQueueCondVar.Free;
  577. inherited;
  578. end;
  579. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  580. begin
  581. FQueueLock.Enter;
  582. try
  583. SetLength(FQueue, Length(FQueue) + ADelta);
  584. finally
  585. FQueueLock.Leave;
  586. end;
  587. {$IFDEF FPC}
  588. FQueueCondVar.SetEvent;
  589. {$ELSE}
  590. FQueueCondVar.ReleaseAll;
  591. {$ENDIF}
  592. end;
  593. function TThreadedQueueCS<T>.PopItem: T;
  594. var
  595. LQueueSize: Integer;
  596. begin
  597. PopItem(LQueueSize, Result);
  598. end;
  599. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  600. begin
  601. AItem := Default(T);
  602. FQueueLock.Enter;
  603. try
  604. Result := wrSignaled;
  605. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  606. begin
  607. {$IFDEF FPC}
  608. Result := FQueueCondVar.WaitFor(FPopTimeout);
  609. {$ELSE}
  610. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  611. {$ENDIF}
  612. end;
  613. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  614. AItem := FQueue[FQueueOffset];
  615. FQueue[FQueueOffset] := Default(T);
  616. if FQueueSize = Length(FQueue) then
  617. begin
  618. {$IFDEF FPC}
  619. FQueueCondVar.SetEvent;
  620. {$ELSE}
  621. FQueueCondVar.ReleaseAll;
  622. {$ENDIF}
  623. end;
  624. Dec(FQueueSize);
  625. Inc(FQueueOffset);
  626. Inc(FTotalItemsPopped);
  627. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  628. finally
  629. AQueueSize := FQueueSize;
  630. FQueueLock.Leave;
  631. end;
  632. end;
  633. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  634. var
  635. LQueueSize: Integer;
  636. begin
  637. Result := PopItem(LQueueSize, AItem);
  638. end;
  639. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  640. begin
  641. PopItem(AQueueSize, Result);
  642. end;
  643. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  644. var
  645. LQueueSize: Integer;
  646. begin
  647. Result := PushItem(AItem, LQueueSize);
  648. end;
  649. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  650. begin
  651. FQueueLock.Enter;
  652. try
  653. if FQueueSize >= High(FQueue) then
  654. begin
  655. if FQueueSize < 512 then Grow(FQueueSize * 2)
  656. else Grow(10);
  657. end;
  658. Result := wrSignaled;
  659. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  660. begin
  661. {$IFDEF FPC}
  662. Result := FQueueCondVar.WaitFor(FPushTimeout);
  663. {$ELSE}
  664. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  665. {$ENDIF}
  666. end;
  667. if FShutDown or (Result <> wrSignaled) then Exit;
  668. if FQueueSize = 0 then
  669. begin
  670. {$IFDEF FPC}
  671. FQueueCondVar.SetEvent;
  672. {$ELSE}
  673. FQueueCondVar.ReleaseAll;
  674. {$ENDIF}
  675. end;
  676. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  677. Inc(FQueueSize);
  678. Inc(FTotalItemsPushed);
  679. finally
  680. AQueueSize := FQueueSize;
  681. FQueueLock.Leave;
  682. end;
  683. end;
  684. procedure TThreadedQueueCS<T>.DoShutDown;
  685. begin
  686. FShutDown := True;
  687. {$IFDEF FPC}
  688. FQueueCondVar.SetEvent;
  689. {$ELSE}
  690. FQueueCondVar.ReleaseAll;
  691. {$ENDIF}
  692. end;
  693. { TThreadedQueueList<T> }
  694. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  695. begin
  696. inherited Create;
  697. fQueue := TQueue<T>.Create;
  698. fQueue.Capacity := AQueueDepth;
  699. fQueueSize := 0;
  700. fQueueLock := TCriticalSection.Create;
  701. {$IFDEF FPC}
  702. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  703. {$ELSE}
  704. fQueueCondVar := TConditionVariableCS.Create;
  705. {$ENDIF}
  706. fPushTimeout := PushTimeout;
  707. fPopTimeout := PopTimeout;
  708. end;
  709. destructor TThreadedQueueList<T>.Destroy;
  710. begin
  711. DoShutDown;
  712. fQueueLock.Free;
  713. fQueueCondVar.Free;
  714. fQueue.Free;
  715. inherited;
  716. end;
  717. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  718. begin
  719. fQueueLock.Enter;
  720. try
  721. fQueue.Capacity := fQueue.Capacity + ADelta;
  722. finally
  723. fQueueLock.Leave;
  724. end;
  725. {$IFDEF FPC}
  726. FQueueCondVar.SetEvent;
  727. {$ELSE}
  728. FQueueCondVar.ReleaseAll;
  729. {$ENDIF}
  730. end;
  731. function TThreadedQueueList<T>.PopItem: T;
  732. var
  733. LQueueSize: Integer;
  734. begin
  735. PopItem(LQueueSize, Result);
  736. end;
  737. {$IFDEF FPC}
  738. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  739. //var
  740. //crono : TChronometer;
  741. begin
  742. AItem := Default(T);
  743. //crono := TChronometer.Create(False);
  744. try
  745. Result := wrSignaled;
  746. //writeln('popitem');
  747. //crono.Start;
  748. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  749. begin
  750. //crono.Start;
  751. Result := FQueueCondVar.WaitFor(FPopTimeout);
  752. //crono.Stop;
  753. //writeln('in: ' + crono.ElapsedTime);
  754. //if result = twaitresult.wrError then result := twaitresult.wrError;
  755. end;
  756. //crono.Stop;
  757. //writeln('out: ' + crono.ElapsedTime);
  758. fQueueLock.Enter;
  759. try
  760. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  761. AItem := fQueue.Extract;
  762. Dec(FQueueSize);
  763. Inc(fTotalItemsPopped);
  764. finally
  765. fQueueLock.Leave;
  766. end;
  767. finally
  768. AQueueSize := fQueueSize;
  769. end;
  770. end;
  771. {$ELSE}
  772. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  773. begin
  774. AItem := Default(T);
  775. fQueueLock.Enter;
  776. try
  777. Result := wrSignaled;
  778. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  779. begin
  780. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  781. end;
  782. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  783. AItem := fQueue.Extract;
  784. if fQueueSize = fQueue.Count then
  785. begin
  786. FQueueCondVar.ReleaseAll;
  787. end;
  788. Dec(FQueueSize);
  789. Inc(fTotalItemsPopped);
  790. finally
  791. AQueueSize := fQueueSize;
  792. fQueueLock.Leave;
  793. end;
  794. end;
  795. {$ENDIF}
  796. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  797. var
  798. LQueueSize: Integer;
  799. begin
  800. Result := PopItem(LQueueSize, AItem);
  801. end;
  802. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  803. begin
  804. PopItem(AQueueSize, Result);
  805. end;
  806. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  807. var
  808. LQueueSize: Integer;
  809. begin
  810. Result := PushItem(AItem, LQueueSize);
  811. end;
  812. {$IFDEF FPC}
  813. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  814. begin
  815. FQueueLock.Enter;
  816. try
  817. Result := wrSignaled;
  818. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  819. //begin
  820. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  821. //end;
  822. if fShutDown or (Result <> wrSignaled) then Exit;
  823. //if fQueueSize = 0 then
  824. //begin
  825. // FQueueCondVar.SetEvent;
  826. //end;
  827. fQueue.Enqueue(AItem);
  828. Inc(FQueueSize);
  829. Inc(fTotalItemsPushed);
  830. finally
  831. AQueueSize := fQueueSize;
  832. FQueueLock.Leave;
  833. //FQueueCondVar.SetEvent;
  834. end;
  835. end;
  836. {$ELSE}
  837. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  838. begin
  839. FQueueLock.Enter;
  840. try
  841. Result := wrSignaled;
  842. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  843. //begin
  844. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  845. //end;
  846. if fShutDown or (Result <> wrSignaled) then Exit;
  847. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  848. fQueue.Enqueue(AItem);
  849. Inc(FQueueSize);
  850. Inc(fTotalItemsPushed);
  851. finally
  852. AQueueSize := fQueueSize;
  853. FQueueLock.Leave;
  854. end;
  855. end;
  856. {$ENDIF}
  857. procedure TThreadedQueueList<T>.DoShutDown;
  858. begin
  859. fShutDown := True;
  860. {$IFDEF FPC}
  861. FQueueCondVar.SetEvent;
  862. {$ELSE}
  863. FQueueCondVar.ReleaseAll;
  864. {$ENDIF}
  865. end;
  866. {$IFNDEF FPC}
  867. { TThreadObjectList<T> }
  868. procedure TThreadObjectList<T>.Add(const Item: T);
  869. begin
  870. LockList;
  871. try
  872. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  873. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  874. finally
  875. UnlockList;
  876. end;
  877. end;
  878. procedure TThreadObjectList<T>.Clear;
  879. begin
  880. LockList;
  881. try
  882. fList.Clear;
  883. finally
  884. UnlockList;
  885. end;
  886. end;
  887. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  888. begin
  889. inherited Create;
  890. fLock := TObject.Create;
  891. fList := TObjectList<T>.Create;
  892. fDuplicates := dupIgnore;
  893. end;
  894. destructor TThreadObjectList<T>.Destroy;
  895. begin
  896. LockList;
  897. try
  898. fList.Free;
  899. inherited Destroy;
  900. finally
  901. UnlockList;
  902. fLock.Free;
  903. end;
  904. end;
  905. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  906. begin
  907. LockList;
  908. try
  909. Result := fList[aIndex];
  910. finally
  911. UnlockList;
  912. end;
  913. end;
  914. function TThreadObjectList<T>.LockList: TObjectList<T>;
  915. begin
  916. System.TMonitor.Enter(fLock);
  917. Result := fList;
  918. end;
  919. procedure TThreadObjectList<T>.Remove(const Item: T);
  920. begin
  921. RemoveItem(Item, TDirection.FromBeginning);
  922. end;
  923. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  924. begin
  925. LockList;
  926. try
  927. fList.RemoveItem(Item, Direction);
  928. finally
  929. UnlockList;
  930. end;
  931. end;
  932. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  933. begin
  934. LockList;
  935. try
  936. fList[aIndex] := aValue;
  937. finally
  938. UnlockList;
  939. end;
  940. end;
  941. procedure TThreadObjectList<T>.UnlockList;
  942. begin
  943. System.TMonitor.Exit(fLock);
  944. end;
  945. {$ENDIF}
  946. { TAnonymousThread }
  947. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  948. begin
  949. fThread := TAdvThread.Create(aProc,aSynchronize);
  950. end;
  951. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  952. begin
  953. Result := TAnonymousThread.Create(aProc,False);
  954. end;
  955. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  956. begin
  957. Result := TAnonymousThread.Create(aProc,True);
  958. end;
  959. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  960. begin
  961. Result := Self;
  962. fThread.OnException(aProc);
  963. end;
  964. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  965. begin
  966. Result := Self;
  967. fThread.OnTerminate(aProc,False);
  968. end;
  969. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  970. begin
  971. Result := Self;
  972. fThread.OnTerminate(aProc,True);
  973. end;
  974. procedure TAnonymousThread.Start;
  975. begin
  976. fThread.Start;
  977. end;
  978. { TTask }
  979. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  980. var
  981. i : Integer;
  982. begin
  983. fTaskStatus := TWorkTaskStatus.wtsPending;
  984. fCustomFaultPolicy := False;
  985. fNumRetries := 0;
  986. fExecuteWithSync := False;
  987. fTerminateWithSync := False;
  988. fExceptionWithSync := False;
  989. fFaultControl := TFaultControl.Create;
  990. fFaultControl.OnRetry := DoRetry;
  991. fOwnedParams := aOwnedParams;
  992. fParamList := TParamList.Create(True);
  993. for i := Low(aParamArray) to High(aParamArray) do
  994. begin
  995. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  996. {$IFDEF FPC}
  997. fParamList[i].Value._AddRef;
  998. {$ENDIF}
  999. end;
  1000. fExecuteProc := aTaskProc;
  1001. fEnabled := False;
  1002. end;
  1003. destructor TTask.Destroy;
  1004. var
  1005. i : Integer;
  1006. begin
  1007. fFaultControl.Free;
  1008. //free passed params
  1009. fParamList.Free;
  1010. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1011. inherited;
  1012. end;
  1013. procedure TTask.DoException(aException : Exception);
  1014. begin
  1015. fTaskStatus := TWorkTaskStatus.wtsException;
  1016. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1017. else raise aException;
  1018. end;
  1019. procedure TTask.DoExecute;
  1020. begin
  1021. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1022. DoInitialize;
  1023. repeat
  1024. try
  1025. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1026. fTaskStatus := TWorkTaskStatus.wtsDone;
  1027. fFaultControl.SuccessExecution;
  1028. except
  1029. on E : Exception do
  1030. begin
  1031. fTaskStatus := TWorkTaskStatus.wtsException;
  1032. {$IFNDEF FPC}
  1033. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1034. {$ELSE}
  1035. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1036. {$ENDIF}
  1037. end;
  1038. end;
  1039. until not fFaultControl.NeedToRetry;
  1040. end;
  1041. procedure TTask.DoInitialize;
  1042. begin
  1043. try
  1044. fFaultControl.Reset;
  1045. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1046. except
  1047. on E : Exception do
  1048. begin
  1049. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1050. end;
  1051. end;
  1052. end;
  1053. function TTask.Done: Boolean;
  1054. begin
  1055. Result := not fFaultControl.TaskFailed;
  1056. end;
  1057. function TTask.Failed: Boolean;
  1058. begin
  1059. Result := fFaultControl.TaskFailed;
  1060. end;
  1061. function TTask.CircuitBreaked: Boolean;
  1062. begin
  1063. Result := fFaultControl.CircuitBreaked;
  1064. end;
  1065. function TTask.LastException: Exception;
  1066. begin
  1067. Result := fFaultControl.LastException;
  1068. end;
  1069. function TTask.MaxRetries: Integer;
  1070. begin
  1071. Result := fFaultControl.MaxRetries;
  1072. end;
  1073. function TTask.NumRetries: Integer;
  1074. begin
  1075. Result := fFaultControl.NumRetries;
  1076. end;
  1077. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1078. begin
  1079. vStopRetries := False;
  1080. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1081. end;
  1082. procedure TTask.DoTerminate;
  1083. begin
  1084. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1085. end;
  1086. function TTask.GetIdTask: Int64;
  1087. begin
  1088. Result := fIdTask;
  1089. end;
  1090. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1091. begin
  1092. {$IFDEF FPC}
  1093. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1094. {$ENDIF}
  1095. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1096. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1097. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1098. end;
  1099. procedure TTask.SetIdTask(Value : Int64);
  1100. begin
  1101. fIdTask := Value;
  1102. end;
  1103. function TTask.GetNumWorker: Integer;
  1104. begin
  1105. Result := fNumWorker;
  1106. end;
  1107. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1108. begin
  1109. Result := fParamList[aIndex].Value;
  1110. end;
  1111. function TTask.GetParam(const aName: string): TFlexValue;
  1112. var
  1113. paramvalue : TParamValue;
  1114. begin
  1115. for paramvalue in fParamList do
  1116. begin
  1117. if CompareText(paramvalue.Name,aName) = 0 then
  1118. begin
  1119. Exit(paramvalue.Value)
  1120. end;
  1121. end;
  1122. //if not exists
  1123. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1124. end;
  1125. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1126. begin
  1127. Result := @fParamList[aIndex].Value;
  1128. end;
  1129. function TTask.GetResult: TFlexValue;
  1130. begin
  1131. Result := fResult;
  1132. end;
  1133. function TTask.IsEnabled: Boolean;
  1134. begin
  1135. Result := fEnabled;
  1136. end;
  1137. procedure TTask.SetNumWorker(Value: Integer);
  1138. begin
  1139. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1140. fNumWorker := Value;
  1141. end;
  1142. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1143. begin
  1144. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1145. fParamList[aIndex].Value := Value;
  1146. end;
  1147. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1148. var
  1149. paramvalue : TParamValue;
  1150. begin
  1151. //check if already exists parameter
  1152. for paramvalue in fParamList do
  1153. begin
  1154. if CompareText(paramvalue.Name,aName) = 0 then
  1155. begin
  1156. paramvalue.Value := Value;
  1157. Exit;
  1158. end;
  1159. end;
  1160. //if not exists, create one
  1161. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1162. end;
  1163. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1164. begin
  1165. SetParam(aName,Value,False);
  1166. end;
  1167. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1168. begin
  1169. fFaultControl.MaxRetries := aMaxRetries;
  1170. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1171. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1172. fCustomFaultPolicy := True;
  1173. end;
  1174. procedure TTask.SetResult(aValue: TFlexValue);
  1175. begin
  1176. fResult := aValue;
  1177. end;
  1178. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1179. begin
  1180. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1181. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1182. fFaultControl.WaitTimeMultiplierFactor := 0;
  1183. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1184. fCustomFaultPolicy := True;
  1185. end;
  1186. function TTask.TaskStatus: TWorkTaskStatus;
  1187. begin
  1188. Result := fTaskStatus;
  1189. end;
  1190. { TWorkTask }
  1191. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1192. begin
  1193. fExceptProc := aTaskProc;
  1194. Result := Self;
  1195. end;
  1196. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1197. begin
  1198. fInitializeProc := aTaskProc;
  1199. Result := Self;
  1200. end;
  1201. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1202. begin
  1203. fRetryProc := aTaskProc;
  1204. Result := Self;
  1205. end;
  1206. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1207. begin
  1208. fTerminateProc := aTaskProc;
  1209. Result := Self;
  1210. end;
  1211. procedure TWorkTask.Run;
  1212. begin
  1213. fEnabled := True;
  1214. end;
  1215. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1216. begin
  1217. Result := Self;
  1218. SetParam(aName,aValue);
  1219. end;
  1220. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1221. begin
  1222. Result := Self;
  1223. SetParam(aName,aValue,aOwned);
  1224. end;
  1225. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1226. begin
  1227. Result := Self;
  1228. SetRetryPolicy(aMaxRetries,0,0);
  1229. end;
  1230. function TWorkTask.RetryForever: IWorkTask;
  1231. begin
  1232. Result := Self;
  1233. SetRetryPolicy(-1,0,0);
  1234. end;
  1235. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1236. begin
  1237. Result := Self;
  1238. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
  1239. end;
  1240. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1241. begin
  1242. Result := Self;
  1243. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1244. end;
  1245. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1246. begin
  1247. Result := Self;
  1248. SetRetryPolicy(aWaitTimeArray);
  1249. end;
  1250. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1251. begin
  1252. Result := Self;
  1253. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
  1254. end;
  1255. function TWorkTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1256. begin
  1257. Result := Self;
  1258. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1259. end;
  1260. { TBackgroundTasks }
  1261. procedure TBackgroundTasks.CancelAll;
  1262. begin
  1263. fTaskQueue.Clear;
  1264. end;
  1265. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  1266. begin
  1267. fMaxQueue := aMaxQueue;
  1268. fConcurrentWorkers := aConcurrentWorkers;
  1269. fInsertTimeout := INFINITE;
  1270. fExtractTimeout := 5000;
  1271. fNumPushedTasks := 0;
  1272. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  1273. end;
  1274. destructor TBackgroundTasks.Destroy;
  1275. begin
  1276. CancelAll;
  1277. fTaskQueue.DoShutDown;
  1278. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1279. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1280. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1281. inherited;
  1282. end;
  1283. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1284. begin
  1285. Result := fTaskQueue.QueueSize;
  1286. end;
  1287. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1288. begin
  1289. Result := AddTask([],False,aTaskProc);
  1290. end;
  1291. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1292. var
  1293. worktask : IWorkTask;
  1294. begin
  1295. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1296. Inc(fNumPushedTasks);
  1297. worktask.SetIdTask(fNumPushedTasks);
  1298. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1299. begin
  1300. Result := worktask;
  1301. end;
  1302. end;
  1303. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1304. begin
  1305. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1306. TTask(Result).ExecuteWithSync := True;
  1307. end;
  1308. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1309. begin
  1310. Result := AddTask_Sync([],False,aTaskProc);
  1311. end;
  1312. procedure TBackgroundTasks.Start;
  1313. var
  1314. i : Integer;
  1315. worker : TWorker;
  1316. begin
  1317. //create workers
  1318. if fWorkerPool <> nil then fWorkerPool.Free;
  1319. fWorkerPool := TWorkerPool.Create(True);
  1320. for i := 1 to fConcurrentWorkers do
  1321. begin
  1322. worker := TQueueWorker.Create(i,fTaskQueue);
  1323. fWorkerPool.Add(worker);
  1324. worker.Start;
  1325. end;
  1326. end;
  1327. { TWorker }
  1328. constructor TWorker.Create;
  1329. begin
  1330. inherited Create(True);
  1331. fDefaultFaultPolicy := TFaultPolicy.Create;
  1332. fStatus := TWorkerStatus.wsSuspended;
  1333. FreeOnTerminate := False;
  1334. end;
  1335. destructor TWorker.Destroy;
  1336. begin
  1337. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1338. inherited;
  1339. end;
  1340. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1341. begin
  1342. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1343. end;
  1344. procedure TWorker.Execute;
  1345. begin
  1346. end;
  1347. procedure TWorker.ExecuteTask;
  1348. begin
  1349. fCurrentTask.DoExecute;
  1350. end;
  1351. procedure TWorker.TerminateTask;
  1352. begin
  1353. fCurrentTask.DoTerminate;
  1354. end;
  1355. { TSimpleWorker }
  1356. constructor TSimpleWorker.Create(aTask : ITask);
  1357. begin
  1358. inherited Create;
  1359. fCurrentTask := aTask;
  1360. FreeOnTerminate := True;
  1361. end;
  1362. procedure TSimpleWorker.Execute;
  1363. begin
  1364. fStatus := TWorkerStatus.wsIdle;
  1365. while not Terminated do
  1366. begin
  1367. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1368. try
  1369. fStatus := TWorkerStatus.wsWorking;
  1370. try
  1371. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1372. else fCurrentTask.DoExecute;
  1373. except
  1374. on E : Exception do
  1375. begin
  1376. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1377. else raise ETaskExecutionError.Create(e.Message);
  1378. end;
  1379. end;
  1380. finally
  1381. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1382. else fCurrentTask.DoTerminate;
  1383. fStatus := TWorkerStatus.wsIdle;
  1384. Terminate;
  1385. end;
  1386. end;
  1387. fStatus := TWorkerStatus.wsSuspended
  1388. end;
  1389. { TQueueWorker }
  1390. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1391. begin
  1392. inherited Create;
  1393. fNumWorker := aNumWorker;
  1394. fTaskQueue := aTaskQueue;
  1395. end;
  1396. procedure TQueueWorker.Execute;
  1397. begin
  1398. fStatus := TWorkerStatus.wsIdle;
  1399. while not Terminated do
  1400. begin
  1401. fCurrentTask := fTaskQueue.PopItem;
  1402. if fCurrentTask <> nil then
  1403. try
  1404. fStatus := TWorkerStatus.wsWorking;
  1405. try
  1406. fCurrentIdTask := fCurrentTask.GetIdTask;
  1407. SetFaultPolicy(TTask(fCurrentTask));
  1408. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1409. else fCurrentTask.DoExecute;
  1410. except
  1411. on E : Exception do
  1412. begin
  1413. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1414. else raise ETaskExecutionError.Create(e.Message);
  1415. end;
  1416. end;
  1417. finally
  1418. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1419. else fCurrentTask.DoTerminate;
  1420. fStatus := TWorkerStatus.wsIdle;
  1421. end;
  1422. end;
  1423. fStatus := TWorkerStatus.wsSuspended
  1424. end;
  1425. { TScheduledWorker }
  1426. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1427. begin
  1428. inherited Create;
  1429. {$IFNDEF DELPHILINUX}
  1430. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1431. {$ENDIF}
  1432. FreeOnTerminate := True;
  1433. fCurrentTask := aScheduledTask;
  1434. end;
  1435. procedure TScheduledWorker.Execute;
  1436. begin
  1437. fStatus := TWorkerStatus.wsIdle;
  1438. if Assigned(fCurrentTask) then
  1439. begin
  1440. try
  1441. fStatus := TWorkerStatus.wsWorking;
  1442. try
  1443. SetFaultPolicy(TTask(fCurrentTask));
  1444. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1445. else fCurrentTask.DoExecute;
  1446. fStatus := TWorkerStatus.wsIdle;
  1447. except
  1448. on E : Exception do
  1449. begin
  1450. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1451. else raise ETaskExecutionError.Create(e.Message);
  1452. end;
  1453. end;
  1454. finally
  1455. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1456. else fCurrentTask.DoTerminate;
  1457. //check if expired
  1458. if (fCurrentTask as IScheduledTask).IsFinished then
  1459. begin
  1460. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1461. else (fCurrentTask as IScheduledTask).DoExpire;
  1462. end;
  1463. end;
  1464. end;
  1465. fCurrentTask := nil;
  1466. fStatus := TWorkerStatus.wsSuspended;
  1467. end;
  1468. procedure TScheduledWorker.ExpireTask;
  1469. begin
  1470. (fCurrentTask as IScheduledTask).DoExpire;
  1471. end;
  1472. { TScheduledTasks }
  1473. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1474. begin
  1475. Result := AddTask(aTaskName,[],False,aTaskProc);
  1476. end;
  1477. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1478. var
  1479. scheduletask : TScheduledTask;
  1480. begin
  1481. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1482. scheduletask.Name := aTaskName;
  1483. Inc(fNumPushedTasks);
  1484. scheduletask.SetIdTask(fNumPushedTasks);
  1485. fTaskList.Add(scheduletask);
  1486. Result := scheduletask;
  1487. end;
  1488. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1489. begin
  1490. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1491. TTask(Result).ExecuteWithSync := True;
  1492. end;
  1493. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1494. begin
  1495. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1496. end;
  1497. constructor TScheduledTasks.Create;
  1498. begin
  1499. fNumPushedTasks := 0;
  1500. fIsStarted := False;
  1501. fFaultPolicy := TFaultPolicy.Create;
  1502. fTaskList := TScheduledTaskList.Create;
  1503. end;
  1504. destructor TScheduledTasks.Destroy;
  1505. begin
  1506. if Assigned(fScheduler) then
  1507. begin
  1508. fScheduler.Terminate;
  1509. fScheduler.WaitFor;
  1510. fScheduler.Free;
  1511. end;
  1512. if Assigned(fTaskList) then fTaskList.Free;
  1513. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1514. inherited;
  1515. end;
  1516. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1517. begin
  1518. Result := fScheduler.Get(aIdTask);
  1519. end;
  1520. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1521. begin
  1522. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1523. Result := fScheduler.Get(aTaskName);
  1524. end;
  1525. procedure TScheduledTasks.Start;
  1526. begin
  1527. if fIsStarted then Exit;
  1528. if not Assigned(fScheduler) then
  1529. begin
  1530. fScheduler := TScheduler.Create(fTaskList);
  1531. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1532. end;
  1533. fScheduler.Start;
  1534. fIsStarted := True;
  1535. end;
  1536. procedure TScheduledTasks.Stop;
  1537. begin
  1538. if Assigned(fScheduler) then fScheduler.Terminate;
  1539. fIsStarted := False;
  1540. end;
  1541. { TScheduledTask }
  1542. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1543. begin
  1544. Result := Self;
  1545. SetParam(aName,aValue);
  1546. end;
  1547. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1548. begin
  1549. Result := Self;
  1550. SetParam(aName,aValue);
  1551. end;
  1552. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1553. begin
  1554. Result := Self;
  1555. ClearSchedule;
  1556. fScheduleMode := TScheduleMode.smRunOnce;
  1557. fStartDate := aStartDate;
  1558. fNextExecution := aStartDate;
  1559. end;
  1560. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1561. begin
  1562. Result := Self;
  1563. ClearSchedule;
  1564. fScheduleMode := TScheduleMode.smRunOnce;
  1565. fStartDate := IncMinute(Now(),aMinutes);
  1566. fNextExecution := fStartDate;
  1567. end;
  1568. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1569. begin
  1570. Result := Self;
  1571. ClearSchedule;
  1572. fScheduleMode := TScheduleMode.smRunOnce;
  1573. fStartDate := IncSecond(Now(),aSeconds);
  1574. fNextExecution := fStartDate;
  1575. end;
  1576. function TScheduledTask.StartNow: IScheduledTask;
  1577. begin
  1578. Result := Self;
  1579. ClearSchedule;
  1580. fScheduleMode := TScheduleMode.smRunOnce;
  1581. fStartDate := Now();
  1582. fNextExecution := fStartDate;
  1583. end;
  1584. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1585. begin
  1586. Result := Self;
  1587. ClearSchedule;
  1588. fScheduleMode := TScheduleMode.smRunOnce;
  1589. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1590. fNextExecution := fStartDate;
  1591. end;
  1592. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1593. begin
  1594. Result := Self;
  1595. ClearSchedule;
  1596. fScheduleMode := TScheduleMode.smRunOnce;
  1597. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1598. fNextExecution := fStartDate;
  1599. end;
  1600. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1601. begin
  1602. Result := Self;
  1603. ClearSchedule;
  1604. fScheduleMode := TScheduleMode.smRunOnce;
  1605. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1606. fNextExecution := fStartDate;
  1607. end;
  1608. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1609. begin
  1610. Result := Self;
  1611. SetRetryPolicy(aMaxRetries,0,0);
  1612. end;
  1613. function TScheduledTask.RetryForever: IScheduledTask;
  1614. begin
  1615. Result := Self;
  1616. SetRetryPolicy(-1,0,0);
  1617. end;
  1618. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1619. begin
  1620. Result := Self;
  1621. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0);
  1622. end;
  1623. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1624. begin
  1625. Result := Self;
  1626. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1627. end;
  1628. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1629. begin
  1630. Result := Self;
  1631. SetRetryPolicy(aWaitTimeArray);
  1632. end;
  1633. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1634. begin
  1635. Result := Self;
  1636. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0);
  1637. end;
  1638. function TScheduledTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1639. begin
  1640. Result := Self;
  1641. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1642. end;
  1643. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1644. begin
  1645. if fStartDate = 0.0 then ClearSchedule;
  1646. fScheduleMode := TScheduleMode.smRepeatMode;
  1647. fTimeMeasure := aTimeMeasure;
  1648. fTimeInterval := aInterval;
  1649. if fStartDate < Now() then fStartDate := Now();
  1650. fNextExecution := fStartDate;
  1651. fEnabled := True;
  1652. end;
  1653. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1654. begin
  1655. if fStartDate = 0.0 then ClearSchedule;
  1656. fScheduleMode := TScheduleMode.smRepeatMode;
  1657. fTimeMeasure := aTimeMeasure;
  1658. fTimeInterval := aInterval;
  1659. if fStartDate < Now() then fStartDate := Now();
  1660. fExpirationDate := aEndTime;
  1661. fNextExecution := fStartDate;
  1662. fEnabled := True;
  1663. end;
  1664. procedure TScheduledTask.RepeatEveryDay;
  1665. begin
  1666. RepeatEvery(1,tmDays);
  1667. end;
  1668. procedure TScheduledTask.RepeatEveryWeek;
  1669. begin
  1670. RepeatEvery(7,tmDays);
  1671. end;
  1672. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1673. begin
  1674. if fStartDate = 0.0 then ClearSchedule;
  1675. fScheduleMode := TScheduleMode.smRepeatMode;
  1676. fTimeMeasure := aTimeMeasure;
  1677. fTimeInterval := aInterval;
  1678. if fStartDate < Now() then fStartDate := Now();
  1679. fExpirationTimes := aRepeatTimes;
  1680. fNextExecution := fStartDate;
  1681. fEnabled := True;
  1682. end;
  1683. procedure TScheduledTask.RunOnce;
  1684. begin
  1685. fScheduleMode := TScheduleMode.smRunOnce;
  1686. if fStartDate < Now() then fStartDate := Now();
  1687. fNextExecution := fStartDate;
  1688. fEnabled := True;
  1689. end;
  1690. procedure TScheduledTask.Cancel;
  1691. begin
  1692. fFinished := True;
  1693. end;
  1694. function TScheduledTask.CheckSchedule: Boolean;
  1695. begin
  1696. Result := False;
  1697. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1698. if fScheduleMode = TScheduleMode.smRunOnce then
  1699. begin
  1700. //if start date reached
  1701. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1702. begin
  1703. fLastExecution := Now();
  1704. Inc(fExecutionTimes);
  1705. fFinished := True;
  1706. Result := True;
  1707. end;
  1708. end
  1709. else
  1710. begin
  1711. //if next execution reached
  1712. if Now() >= fNextExecution then
  1713. begin
  1714. //check expiration limits
  1715. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1716. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1717. begin
  1718. fFinished := True;
  1719. Exit;
  1720. end;
  1721. //calculate next execution
  1722. case fTimeMeasure of
  1723. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1724. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1725. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1726. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1727. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1728. end;
  1729. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1730. else
  1731. begin
  1732. fLastExecution := Now();
  1733. Inc(fExecutionTimes);
  1734. Result := True;
  1735. end;
  1736. end;
  1737. end;
  1738. end;
  1739. procedure TScheduledTask.ClearSchedule;
  1740. begin
  1741. inherited;
  1742. fFinished := False;
  1743. fStartDate := 0.0;
  1744. fLastExecution := 0.0;
  1745. fNextExecution := 0.0;
  1746. fExpirationDate := 0.0;
  1747. fScheduleMode := TScheduleMode.smRunOnce;
  1748. fTimeMeasure := TTimeMeasure.tmSeconds;
  1749. fTimeInterval := 0;
  1750. end;
  1751. procedure TScheduledTask.DoExpire;
  1752. begin
  1753. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1754. fEnabled := False;
  1755. end;
  1756. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1757. begin
  1758. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1759. end;
  1760. function TScheduledTask.GetTaskName: string;
  1761. begin
  1762. Result := fName;
  1763. end;
  1764. function TScheduledTask.IsFinished: Boolean;
  1765. begin
  1766. Result := fFinished;
  1767. end;
  1768. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1769. begin
  1770. fExceptProc := aTaskProc;
  1771. Result := Self;
  1772. end;
  1773. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1774. begin
  1775. Result := OnException(aTaskProc);
  1776. TTask(Result).ExceptionWithSync := True;
  1777. end;
  1778. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1779. begin
  1780. fRetryProc := aTaskProc;
  1781. Result := Self;
  1782. end;
  1783. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1784. begin
  1785. fExpiredProc := aTaskProc;
  1786. Result := Self;
  1787. end;
  1788. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1789. begin
  1790. Result := OnExpired(aTaskProc);
  1791. TScheduledTask(Result).ExpireWithSync := True;
  1792. end;
  1793. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1794. begin
  1795. fInitializeProc := aTaskProc;
  1796. Result := Self;
  1797. end;
  1798. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1799. begin
  1800. fTerminateProc := aTaskProc;
  1801. Result := Self;
  1802. end;
  1803. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1804. begin
  1805. Result := OnTerminated(aTaskProc);
  1806. TTask(Result).TerminateWithSync := True;
  1807. end;
  1808. { TScheduler }
  1809. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1810. begin
  1811. inherited Create(True);
  1812. FreeOnTerminate := False;
  1813. fListLock := TCriticalSection.Create;
  1814. fRemoveTaskAfterExpiration := False;
  1815. fTaskList := aTaskList;
  1816. {$IFDEF FPC}
  1817. fCondVar := TSimpleEvent.Create;
  1818. {$ELSE}
  1819. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1820. {$ENDIF}
  1821. end;
  1822. destructor TScheduler.Destroy;
  1823. begin
  1824. fCondVar.SetEvent;
  1825. fCondVar.Free;
  1826. fTaskList := nil;
  1827. fListLock.Free;
  1828. inherited;
  1829. end;
  1830. procedure TScheduler.Execute;
  1831. var
  1832. task : IScheduledTask;
  1833. worker : TScheduledWorker;
  1834. numworker : Int64;
  1835. begin
  1836. numworker := 0;
  1837. while not Terminated do
  1838. begin
  1839. fListLock.Enter;
  1840. try
  1841. for task in fTaskList do
  1842. begin
  1843. if (task.IsEnabled) and (not task.IsFinished) then
  1844. begin
  1845. if task.CheckSchedule then
  1846. begin
  1847. Inc(numworker);
  1848. worker := TScheduledWorker.Create(numworker,task);
  1849. worker.Start;
  1850. end;
  1851. end
  1852. else
  1853. begin
  1854. if task.IsEnabled then
  1855. begin
  1856. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1857. // else task.DoExpire;
  1858. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1859. end;
  1860. end;
  1861. end;
  1862. task := nil;
  1863. finally
  1864. fListLock.Leave;
  1865. end;
  1866. fCondVar.WaitFor(250);
  1867. end;
  1868. end;
  1869. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1870. begin
  1871. Result := fTaskList.Add(aTask);
  1872. end;
  1873. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1874. var
  1875. task : IScheduledTask;
  1876. begin
  1877. fListLock.Enter;
  1878. try
  1879. for task in fTaskList do
  1880. begin
  1881. if task.IdTask = aIdTask then Exit(task);
  1882. end;
  1883. finally
  1884. fListLock.Leave;
  1885. end;
  1886. end;
  1887. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1888. var
  1889. task : IScheduledTask;
  1890. begin
  1891. fListLock.Enter;
  1892. try
  1893. for task in fTaskList do
  1894. begin
  1895. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1896. end;
  1897. finally
  1898. fListLock.Leave;
  1899. end;
  1900. end;
  1901. { TAdvThread }
  1902. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1903. begin
  1904. inherited Create(True);
  1905. FreeOnTerminate := True;
  1906. fExecuteWithSync := aSynchronize;
  1907. fExecuteProc := aProc;
  1908. end;
  1909. procedure TAdvThread.DoExecute;
  1910. begin
  1911. try
  1912. if Assigned(fExecuteProc) then fExecuteProc;
  1913. except
  1914. on E : Exception do
  1915. begin
  1916. {$IFNDEF FPC}
  1917. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1918. {$ELSE}
  1919. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1920. {$ENDIF}
  1921. else raise e;
  1922. end;
  1923. end;
  1924. end;
  1925. procedure TAdvThread.CallToTerminate;
  1926. begin
  1927. if Assigned(fTerminateProc) then fTerminateProc;
  1928. end;
  1929. procedure TAdvThread.DoTerminate;
  1930. begin
  1931. if fTerminateWithSync then Synchronize(CallToTerminate)
  1932. else CallToTerminate;
  1933. end;
  1934. procedure TAdvThread.Execute;
  1935. begin
  1936. if fExecuteWithSync then Synchronize(DoExecute)
  1937. else DoExecute;
  1938. end;
  1939. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1940. begin
  1941. fExceptionProc := aProc;
  1942. end;
  1943. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1944. begin
  1945. fTerminateWithSync := aSynchronize;
  1946. fTerminateProc := aProc;
  1947. end;
  1948. { TRunTask }
  1949. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  1950. begin
  1951. Result := Execute([],False,aTaskProc);
  1952. end;
  1953. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  1954. begin
  1955. Result := Execute_Sync([],False,aTaskProc);
  1956. end;
  1957. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1958. var
  1959. task : TWorkTask;
  1960. worker : TSimpleWorker;
  1961. begin
  1962. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1963. task.ExecuteWithSync := False;
  1964. Result := task;
  1965. worker := TSimpleWorker.Create(task);
  1966. worker.Start;
  1967. end;
  1968. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1969. var
  1970. task : TWorkTask;
  1971. worker : TSimpleWorker;
  1972. begin
  1973. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1974. task.ExecuteWithSync := True;
  1975. Result := task;
  1976. worker := TSimpleWorker.Create(task);
  1977. worker.Start;
  1978. end;
  1979. { TParamValue }
  1980. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  1981. begin
  1982. inherited Create;
  1983. fName := aName;
  1984. fValue := aValue;
  1985. fOwned := aOwnedValue;
  1986. end;
  1987. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  1988. begin
  1989. inherited Create;
  1990. fName := aName;
  1991. fValue := aValue;
  1992. fOwned := aOwnedValue;
  1993. end;
  1994. constructor TParamValue.Create;
  1995. begin
  1996. fName := '';
  1997. fOwned := False;
  1998. end;
  1999. destructor TParamValue.Destroy;
  2000. begin
  2001. {$IFDEF FPC}
  2002. fValue._Release;
  2003. {$ENDIF}
  2004. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2005. inherited;
  2006. end;
  2007. end.