Quick.Threads.pas 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636
  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 16/01/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. Quick.Commons,
  31. //Quick.Chrono,
  32. Quick.Value,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFDEF FPC}
  126. TProc = procedure of object;
  127. {$ENDIF}
  128. TAdvThread = class(TThread)
  129. private
  130. fExecuteProc : TProc;
  131. fTerminateProc : TProc;
  132. fExecuteWithSync : Boolean;
  133. fTerminateWithSync : Boolean;
  134. procedure DoExecute;
  135. procedure CallToTerminate;
  136. protected
  137. procedure DoTerminate; override;
  138. public
  139. constructor Create(aProc : TProc; aSynchronize : Boolean);
  140. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  141. procedure Execute; override;
  142. end;
  143. IAnonymousThread = interface
  144. procedure Start;
  145. function OnTerminate(aProc : TProc) : IAnonymousThread;
  146. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  147. end;
  148. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  149. private
  150. fThread : TAdvThread;
  151. constructor Create(aProc : TProc; aSynchronize : Boolean);
  152. public
  153. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  154. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  155. procedure Start;
  156. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  157. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  158. end;
  159. TParamArray = array of TFlexValue;
  160. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  161. TScheduleMode = (smRunOnce, smRepeatMode);
  162. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
  163. ITask = interface
  164. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  165. function GetParam(aIndex : Integer) : TFlexValue;
  166. function TaskStatus : TWorkTaskStatus;
  167. function GetNumWorker : Integer;
  168. procedure SetNumWorker(Value : Integer);
  169. function GetIdTask : Int64;
  170. procedure SetIdTask(Value : Int64);
  171. procedure DoExecute;
  172. procedure DoException(aException : Exception);
  173. procedure DoTerminate;
  174. property Param[index : Integer] : TFlexValue read GetParam;
  175. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  176. property IdTask : Int64 read GetIdTask;
  177. function IsEnabled : Boolean;
  178. end;
  179. {$IFNDEF FPC}
  180. TTaskProc = reference to procedure(task : ITask);
  181. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  182. {$ELSE}
  183. TTaskProc = procedure(task : ITask) of object;
  184. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  185. {$ENDIF}
  186. IWorkTask = interface(ITask)
  187. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  188. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  189. procedure Run;
  190. end;
  191. IScheduledTask = interface(ITask)
  192. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  193. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  194. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  195. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  196. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  197. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  198. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  199. function CheckSchedule : Boolean;
  200. procedure DoExpire;
  201. function GetTaskName : string;
  202. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  203. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  204. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  205. function StartOnDayChange : IScheduledTask;
  206. procedure RunOnce;
  207. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  208. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  209. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  210. procedure RepeatEveryDay;
  211. procedure RepeatEveryWeek;
  212. function IsFinished : Boolean;
  213. procedure Cancel;
  214. property Name : string read GetTaskName;
  215. end;
  216. TTask = class(TInterfacedObject,ITask)
  217. private
  218. fIdTask : Int64;
  219. fNumWorker : Integer;
  220. fParamArray : TParamArray;
  221. fExecuteProc : TTaskProc;
  222. fExceptProc : TTaskExceptionProc;
  223. fTerminateProc : TTaskProc;
  224. fExpiredProc : TTaskProc;
  225. fTaskStatus : TWorkTaskStatus;
  226. fOwnedParams : Boolean;
  227. fEnabled : Boolean;
  228. fExecuteWithSync : Boolean;
  229. fExceptionWithSync : Boolean;
  230. fTerminateWithSync : Boolean;
  231. function GetParam(aIndex : Integer) : TFlexValue;
  232. procedure DoExecute;
  233. procedure DoException(aException : Exception);
  234. procedure DoTerminate;
  235. function GetNumWorker : Integer;
  236. procedure SetNumWorker(Value : Integer);
  237. function GetIdTask : Int64;
  238. procedure SetIdTask(Value : Int64);
  239. protected
  240. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  241. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  242. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  243. public
  244. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  245. destructor Destroy; override;
  246. property IdTask : Int64 read GetIdTask;
  247. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  248. function IsEnabled : Boolean;
  249. function TaskStatus : TWorkTaskStatus;
  250. end;
  251. TWorkTask = class(TTask,IWorkTask)
  252. public
  253. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  254. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  255. procedure Run; virtual;
  256. end;
  257. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  258. TScheduledTask = class(TTask,IScheduledTask)
  259. private
  260. fName : string;
  261. fExecutionTimes : Integer;
  262. fScheduleMode : TScheduleMode;
  263. fTimeInterval : Integer;
  264. fTimeMeasure : TTimeMeasure;
  265. fStartDate : TDateTime;
  266. fLastExecution : TDateTime;
  267. fNextExecution : TDateTime;
  268. fExpirationDate : TDateTime;
  269. fExpirationTimes : Integer;
  270. fFinished : Boolean;
  271. fExpireWithSync: Boolean;
  272. procedure ClearSchedule;
  273. function CheckSchedule : Boolean;
  274. procedure DoExpire;
  275. function GetTaskName : string;
  276. protected
  277. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  278. public
  279. property Name : string read fName write fName;
  280. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  281. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  282. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  283. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  284. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  285. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  286. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  287. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  288. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  289. function StartOnDayChange : IScheduledTask;
  290. procedure RunOnce;
  291. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  292. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  293. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  294. procedure RepeatEveryDay;
  295. procedure RepeatEveryWeek;
  296. function IsFinished : Boolean;
  297. procedure Cancel;
  298. end;
  299. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  300. TWorker = class(TThread)
  301. private
  302. fNumWorker : Integer;
  303. fCurrentIdTask : Integer;
  304. fStatus : TWorkerStatus;
  305. fTaskQueue : TTaskQueue;
  306. procedure ExecuteTask;
  307. procedure TerminateTask;
  308. protected
  309. fCurrentTask : ITask;
  310. public
  311. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  312. property NumWorker : Integer read fNumWorker;
  313. property Status : TWorkerStatus read fStatus;
  314. procedure Execute; override;
  315. end;
  316. TScheduledWorker = class(TWorker)
  317. private
  318. procedure ExpireTask;
  319. public
  320. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  321. procedure Execute; override;
  322. end;
  323. TWorkerPool = TObjectList<TWorker>;
  324. TBackgroundTasks = class
  325. private
  326. fMaxQueue : Integer;
  327. fWorkerPool : TWorkerPool;
  328. fConcurrentWorkers : Integer;
  329. fInsertTimeout : Cardinal;
  330. fExtractTimeout : Cardinal;
  331. fTaskQueue : TTaskQueue;
  332. fNumPushedTasks : Int64;
  333. function GetTaskQueue : Cardinal;
  334. public
  335. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  336. destructor Destroy; override;
  337. property MaxQueue : Integer read fMaxQueue;
  338. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  339. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  340. property TaskQueued : Cardinal read GetTaskQueue;
  341. property NumPushedTasks : Int64 read fNumPushedTasks;
  342. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  343. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  344. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  345. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  346. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  347. procedure Start;
  348. procedure CancelAll;
  349. end;
  350. TScheduledTaskList = TList<IScheduledTask>;
  351. TScheduler = class(TThread)
  352. private
  353. fListLock : TCriticalSection;
  354. fCondVar : TSimpleEvent;
  355. fTaskList : TScheduledTaskList;
  356. fRemoveTaskAfterExpiration : Boolean;
  357. procedure ExpireTask;
  358. public
  359. constructor Create(aTaskList : TScheduledTaskList);
  360. destructor Destroy; override;
  361. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  362. procedure Execute; override;
  363. function Add(aTask : TScheduledTask) : Integer;
  364. function Get(aIdTask : Int64) : IScheduledTask; overload;
  365. function Get(const aTaskName : string) : IScheduledTask; overload;
  366. end;
  367. TScheduledTasks = class
  368. private
  369. fTaskList : TScheduledTaskList;
  370. fScheduler : TScheduler;
  371. fNumPushedTasks : Int64;
  372. fRemoveTaskAfterExpiration : Boolean;
  373. fIsStarted : Boolean;
  374. public
  375. constructor Create;
  376. destructor Destroy; override;
  377. property NumPushedTasks : Int64 read fNumPushedTasks;
  378. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  379. property IsStarted : Boolean read fIsStarted;
  380. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  381. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  382. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  383. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  384. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  385. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  386. procedure Start;
  387. procedure Stop;
  388. end;
  389. implementation
  390. { TThreadedQueueCS<T> }
  391. procedure TThreadedQueueCS<T>.Clear;
  392. var
  393. obj : T;
  394. begin
  395. FQueueLock.Enter;
  396. try
  397. for obj in FQueue do
  398. begin
  399. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  400. end;
  401. SetLength(FQueue,0);
  402. finally
  403. FQueueLock.Leave;
  404. end;
  405. {$IFDEF FPC}
  406. FQueueCondVar.SetEvent;
  407. {$ELSE}
  408. FQueueCondVar.ReleaseAll;
  409. {$ENDIF}
  410. end;
  411. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  412. begin
  413. inherited Create;
  414. SetLength(FQueue, AQueueDepth);
  415. FQueueLock := TCriticalSection.Create;
  416. {$IFDEF FPC}
  417. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  418. {$ELSE}
  419. FQueueCondVar := TConditionVariableCS.Create;
  420. {$ENDIF}
  421. FPushTimeout := PushTimeout;
  422. FPopTimeout := PopTimeout;
  423. end;
  424. destructor TThreadedQueueCS<T>.Destroy;
  425. begin
  426. DoShutDown;
  427. FQueueLock.Free;
  428. FQueueCondVar.Free;
  429. inherited;
  430. end;
  431. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  432. begin
  433. FQueueLock.Enter;
  434. try
  435. SetLength(FQueue, Length(FQueue) + ADelta);
  436. finally
  437. FQueueLock.Leave;
  438. end;
  439. {$IFDEF FPC}
  440. FQueueCondVar.SetEvent;
  441. {$ELSE}
  442. FQueueCondVar.ReleaseAll;
  443. {$ENDIF}
  444. end;
  445. function TThreadedQueueCS<T>.PopItem: T;
  446. var
  447. LQueueSize: Integer;
  448. begin
  449. PopItem(LQueueSize, Result);
  450. end;
  451. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  452. begin
  453. AItem := Default(T);
  454. FQueueLock.Enter;
  455. try
  456. Result := wrSignaled;
  457. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  458. begin
  459. {$IFDEF FPC}
  460. Result := FQueueCondVar.WaitFor(FPopTimeout);
  461. {$ELSE}
  462. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  463. {$ENDIF}
  464. end;
  465. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  466. AItem := FQueue[FQueueOffset];
  467. FQueue[FQueueOffset] := Default(T);
  468. if FQueueSize = Length(FQueue) then
  469. begin
  470. {$IFDEF FPC}
  471. FQueueCondVar.SetEvent;
  472. {$ELSE}
  473. FQueueCondVar.ReleaseAll;
  474. {$ENDIF}
  475. end;
  476. Dec(FQueueSize);
  477. Inc(FQueueOffset);
  478. Inc(FTotalItemsPopped);
  479. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  480. finally
  481. AQueueSize := FQueueSize;
  482. FQueueLock.Leave;
  483. end;
  484. end;
  485. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  486. var
  487. LQueueSize: Integer;
  488. begin
  489. Result := PopItem(LQueueSize, AItem);
  490. end;
  491. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  492. begin
  493. PopItem(AQueueSize, Result);
  494. end;
  495. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  496. var
  497. LQueueSize: Integer;
  498. begin
  499. Result := PushItem(AItem, LQueueSize);
  500. end;
  501. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  502. begin
  503. FQueueLock.Enter;
  504. try
  505. Result := wrSignaled;
  506. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  507. begin
  508. {$IFDEF FPC}
  509. Result := FQueueCondVar.WaitFor(FPushTimeout);
  510. {$ELSE}
  511. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  512. {$ENDIF}
  513. end;
  514. if FShutDown or (Result <> wrSignaled) then Exit;
  515. if FQueueSize = 0 then
  516. begin
  517. {$IFDEF FPC}
  518. FQueueCondVar.SetEvent;
  519. {$ELSE}
  520. FQueueCondVar.ReleaseAll;
  521. {$ENDIF}
  522. end;
  523. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  524. Inc(FQueueSize);
  525. Inc(FTotalItemsPushed);
  526. finally
  527. AQueueSize := FQueueSize;
  528. FQueueLock.Leave;
  529. end;
  530. end;
  531. procedure TThreadedQueueCS<T>.DoShutDown;
  532. begin
  533. FShutDown := True;
  534. {$IFDEF FPC}
  535. FQueueCondVar.SetEvent;
  536. {$ELSE}
  537. FQueueCondVar.ReleaseAll;
  538. {$ENDIF}
  539. end;
  540. { TThreadedQueueList<T> }
  541. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  542. begin
  543. inherited Create;
  544. fQueue := TQueue<T>.Create;
  545. fQueue.Capacity := AQueueDepth;
  546. fQueueSize := 0;
  547. fQueueLock := TCriticalSection.Create;
  548. {$IFDEF FPC}
  549. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  550. {$ELSE}
  551. fQueueCondVar := TConditionVariableCS.Create;
  552. {$ENDIF}
  553. fPushTimeout := PushTimeout;
  554. fPopTimeout := PopTimeout;
  555. end;
  556. destructor TThreadedQueueList<T>.Destroy;
  557. begin
  558. DoShutDown;
  559. fQueueLock.Free;
  560. fQueueCondVar.Free;
  561. fQueue.Free;
  562. inherited;
  563. end;
  564. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  565. begin
  566. fQueueLock.Enter;
  567. try
  568. fQueue.Capacity := fQueue.Capacity + ADelta;
  569. finally
  570. fQueueLock.Leave;
  571. end;
  572. {$IFDEF FPC}
  573. FQueueCondVar.SetEvent;
  574. {$ELSE}
  575. FQueueCondVar.ReleaseAll;
  576. {$ENDIF}
  577. end;
  578. function TThreadedQueueList<T>.PopItem: T;
  579. var
  580. LQueueSize: Integer;
  581. begin
  582. PopItem(LQueueSize, Result);
  583. end;
  584. {$IFDEF FPC}
  585. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  586. //var
  587. //crono : TChronometer;
  588. begin
  589. AItem := Default(T);
  590. //crono := TChronometer.Create(False);
  591. try
  592. Result := wrSignaled;
  593. //writeln('popitem');
  594. //crono.Start;
  595. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  596. begin
  597. //crono.Start;
  598. Result := FQueueCondVar.WaitFor(FPopTimeout);
  599. //crono.Stop;
  600. //writeln('in: ' + crono.ElapsedTime);
  601. //if result = twaitresult.wrError then result := twaitresult.wrError;
  602. end;
  603. //crono.Stop;
  604. //writeln('out: ' + crono.ElapsedTime);
  605. fQueueLock.Enter;
  606. try
  607. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  608. AItem := fQueue.Extract;
  609. Dec(FQueueSize);
  610. Inc(fTotalItemsPopped);
  611. finally
  612. fQueueLock.Leave;
  613. end;
  614. finally
  615. AQueueSize := fQueueSize;
  616. end;
  617. end;
  618. {$ELSE}
  619. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  620. begin
  621. AItem := Default(T);
  622. fQueueLock.Enter;
  623. try
  624. Result := wrSignaled;
  625. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  626. begin
  627. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  628. end;
  629. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  630. AItem := fQueue.Extract;
  631. if fQueueSize = fQueue.Count then
  632. begin
  633. FQueueCondVar.ReleaseAll;
  634. end;
  635. Dec(FQueueSize);
  636. Inc(fTotalItemsPopped);
  637. finally
  638. AQueueSize := fQueueSize;
  639. fQueueLock.Leave;
  640. end;
  641. end;
  642. {$ENDIF}
  643. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  644. var
  645. LQueueSize: Integer;
  646. begin
  647. Result := PopItem(LQueueSize, AItem);
  648. end;
  649. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  650. begin
  651. PopItem(AQueueSize, Result);
  652. end;
  653. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  654. var
  655. LQueueSize: Integer;
  656. begin
  657. Result := PushItem(AItem, LQueueSize);
  658. end;
  659. {$IFDEF FPC}
  660. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  661. begin
  662. FQueueLock.Enter;
  663. try
  664. Result := wrSignaled;
  665. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  666. //begin
  667. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  668. //end;
  669. if fShutDown or (Result <> wrSignaled) then Exit;
  670. //if fQueueSize = 0 then
  671. //begin
  672. // FQueueCondVar.SetEvent;
  673. //end;
  674. fQueue.Enqueue(AItem);
  675. Inc(FQueueSize);
  676. Inc(fTotalItemsPushed);
  677. finally
  678. AQueueSize := fQueueSize;
  679. FQueueLock.Leave;
  680. //FQueueCondVar.SetEvent;
  681. end;
  682. end;
  683. {$ELSE}
  684. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  685. begin
  686. FQueueLock.Enter;
  687. try
  688. Result := wrSignaled;
  689. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  690. //begin
  691. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  692. //end;
  693. if fShutDown or (Result <> wrSignaled) then Exit;
  694. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  695. fQueue.Enqueue(AItem);
  696. Inc(FQueueSize);
  697. Inc(fTotalItemsPushed);
  698. finally
  699. AQueueSize := fQueueSize;
  700. FQueueLock.Leave;
  701. end;
  702. end;
  703. {$ENDIF}
  704. procedure TThreadedQueueList<T>.DoShutDown;
  705. begin
  706. fShutDown := True;
  707. {$IFDEF FPC}
  708. FQueueCondVar.SetEvent;
  709. {$ELSE}
  710. FQueueCondVar.ReleaseAll;
  711. {$ENDIF}
  712. end;
  713. {$IFNDEF FPC}
  714. { TThreadObjectList<T> }
  715. procedure TThreadObjectList<T>.Add(const Item: T);
  716. begin
  717. LockList;
  718. try
  719. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  720. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  721. finally
  722. UnlockList;
  723. end;
  724. end;
  725. procedure TThreadObjectList<T>.Clear;
  726. begin
  727. LockList;
  728. try
  729. fList.Clear;
  730. finally
  731. UnlockList;
  732. end;
  733. end;
  734. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  735. begin
  736. inherited Create;
  737. fLock := TObject.Create;
  738. fList := TObjectList<T>.Create;
  739. fDuplicates := dupIgnore;
  740. end;
  741. destructor TThreadObjectList<T>.Destroy;
  742. begin
  743. LockList;
  744. try
  745. fList.Free;
  746. inherited Destroy;
  747. finally
  748. UnlockList;
  749. fLock.Free;
  750. end;
  751. end;
  752. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  753. begin
  754. LockList;
  755. try
  756. Result := fList[aIndex];
  757. finally
  758. UnlockList;
  759. end;
  760. end;
  761. function TThreadObjectList<T>.LockList: TObjectList<T>;
  762. begin
  763. System.TMonitor.Enter(fLock);
  764. Result := fList;
  765. end;
  766. procedure TThreadObjectList<T>.Remove(const Item: T);
  767. begin
  768. RemoveItem(Item, TDirection.FromBeginning);
  769. end;
  770. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  771. begin
  772. LockList;
  773. try
  774. fList.RemoveItem(Item, Direction);
  775. finally
  776. UnlockList;
  777. end;
  778. end;
  779. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  780. begin
  781. LockList;
  782. try
  783. fList[aIndex] := aValue;
  784. finally
  785. UnlockList;
  786. end;
  787. end;
  788. procedure TThreadObjectList<T>.UnlockList;
  789. begin
  790. System.TMonitor.Exit(fLock);
  791. end;
  792. {$ENDIF}
  793. { TAnonymousThread }
  794. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  795. begin
  796. fThread := TAdvThread.Create(aProc,aSynchronize);
  797. end;
  798. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  799. begin
  800. Result := TAnonymousThread.Create(aProc,False);
  801. end;
  802. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  803. begin
  804. Result := TAnonymousThread.Create(aProc,True);
  805. end;
  806. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  807. begin
  808. Result := Self;
  809. fThread.OnTerminate(aProc,False);
  810. end;
  811. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  812. begin
  813. Result := Self;
  814. fThread.OnTerminate(aProc,True);
  815. end;
  816. procedure TAnonymousThread.Start;
  817. begin
  818. fThread.Start;
  819. end;
  820. { TTask }
  821. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  822. var
  823. i : Integer;
  824. begin
  825. fTaskStatus := TWorkTaskStatus.wtsPending;
  826. fExecuteWithSync := False;
  827. fTerminateWithSync := False;
  828. fExceptionWithSync := False;
  829. fOwnedParams := aOwnedParams;
  830. SetLength(fParamArray,High(aParamArray)+1);
  831. for i := Low(aParamArray) to High(aParamArray) do
  832. begin
  833. fParamArray[i].Create(aParamArray[i]);
  834. {$IFDEF FPC}
  835. fParamArray[i]._AddRef;
  836. {$ENDIF}
  837. end;
  838. fExecuteProc := aTaskProc;
  839. fEnabled := False;
  840. end;
  841. destructor TTask.Destroy;
  842. var
  843. i : Integer;
  844. begin
  845. //free passed params
  846. if fOwnedParams then
  847. begin
  848. for i := Low(fParamArray) to High(fParamArray) do
  849. begin
  850. {$IFDEF FPC}
  851. fParamArray[i]._Release;
  852. {$ENDIF}
  853. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  854. end;
  855. end;
  856. fParamArray := nil;
  857. inherited;
  858. end;
  859. procedure TTask.DoException(aException : Exception);
  860. begin
  861. fTaskStatus := TWorkTaskStatus.wtsException;
  862. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  863. else raise aException;
  864. end;
  865. procedure TTask.DoExecute;
  866. begin
  867. fTaskStatus := TWorkTaskStatus.wtsRunning;
  868. if Assigned(fExecuteProc) then fExecuteProc(Self);
  869. fTaskStatus := TWorkTaskStatus.wtsDone;
  870. end;
  871. procedure TTask.DoTerminate;
  872. begin
  873. if Assigned(fTerminateProc) then fTerminateProc(Self);
  874. end;
  875. function TTask.GetIdTask: Int64;
  876. begin
  877. Result := fIdTask;
  878. end;
  879. procedure TTask.SetIdTask(Value : Int64);
  880. begin
  881. fIdTask := Value;
  882. end;
  883. function TTask.GetNumWorker: Integer;
  884. begin
  885. Result := fNumWorker;
  886. end;
  887. function TTask.GetParam(aIndex: Integer): TFlexValue;
  888. begin
  889. Result := fParamArray[aIndex];
  890. end;
  891. function TTask.IsEnabled: Boolean;
  892. begin
  893. Result := fEnabled;
  894. end;
  895. procedure TTask.SetNumWorker(Value: Integer);
  896. begin
  897. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  898. fNumWorker := Value;
  899. end;
  900. function TTask.TaskStatus: TWorkTaskStatus;
  901. begin
  902. Result := fTaskStatus;
  903. end;
  904. { TWorkTask }
  905. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  906. begin
  907. fExceptProc := aTaskProc;
  908. Result := Self;
  909. end;
  910. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  911. begin
  912. fTerminateProc := aTaskProc;
  913. Result := Self;
  914. end;
  915. procedure TWorkTask.Run;
  916. begin
  917. fEnabled := True;
  918. end;
  919. { TBackgroundTasks }
  920. procedure TBackgroundTasks.CancelAll;
  921. begin
  922. fTaskQueue.Clear;
  923. end;
  924. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  925. begin
  926. fMaxQueue := aMaxQueue;
  927. fConcurrentWorkers := aConcurrentWorkers;
  928. fInsertTimeout := INFINITE;
  929. fExtractTimeout := 5000;
  930. fNumPushedTasks := 0;
  931. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  932. end;
  933. destructor TBackgroundTasks.Destroy;
  934. begin
  935. if Assigned(fWorkerPool) then fWorkerPool.Free;
  936. if Assigned(fTaskQueue) then fTaskQueue.Free;
  937. inherited;
  938. end;
  939. function TBackgroundTasks.GetTaskQueue: Cardinal;
  940. begin
  941. Result := fTaskQueue.QueueSize;
  942. end;
  943. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  944. begin
  945. Result := AddTask([],False,aTaskProc);
  946. end;
  947. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  948. var
  949. worktask : IWorkTask;
  950. begin
  951. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  952. Inc(fNumPushedTasks);
  953. worktask.SetIdTask(fNumPushedTasks);
  954. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  955. begin
  956. Result := worktask;
  957. end;
  958. end;
  959. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  960. begin
  961. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  962. TTask(Result).ExecuteWithSync := True;
  963. end;
  964. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  965. begin
  966. Result := AddTask_Sync([],False,aTaskProc);
  967. end;
  968. procedure TBackgroundTasks.Start;
  969. var
  970. i : Integer;
  971. worker : TWorker;
  972. begin
  973. //create workers
  974. if fWorkerPool <> nil then fWorkerPool.Free;
  975. fWorkerPool := TWorkerPool.Create(True);
  976. for i := 1 to fConcurrentWorkers do
  977. begin
  978. worker := TWorker.Create(i,fTaskQueue);
  979. fWorkerPool.Add(worker);
  980. worker.Start;
  981. end;
  982. end;
  983. { TWorker }
  984. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  985. begin
  986. inherited Create(True);
  987. fNumWorker := aNumWorker;
  988. fStatus := TWorkerStatus.wsSuspended;
  989. fTaskQueue := aTaskQueue;
  990. FreeOnTerminate := False;
  991. end;
  992. procedure TWorker.ExecuteTask;
  993. begin
  994. fCurrentTask.DoExecute;
  995. end;
  996. procedure TWorker.TerminateTask;
  997. begin
  998. fCurrentTask.DoTerminate;
  999. end;
  1000. procedure TWorker.Execute;
  1001. begin
  1002. fStatus := TWorkerStatus.wsIdle;
  1003. while (not Terminated) and (fTaskQueue.QueueSize > 0) do
  1004. begin
  1005. fCurrentTask := fTaskQueue.PopItem;
  1006. if fCurrentTask <> nil then
  1007. try
  1008. fStatus := TWorkerStatus.wsWorking;
  1009. try
  1010. fCurrentIdTask := fCurrentTask.GetIdTask;
  1011. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1012. else fCurrentTask.DoExecute;
  1013. except
  1014. on E : Exception do
  1015. begin
  1016. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1017. else raise Exception.Create(e.Message);
  1018. end;
  1019. end;
  1020. finally
  1021. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1022. else fCurrentTask.DoTerminate;
  1023. fStatus := TWorkerStatus.wsIdle;
  1024. end;
  1025. end;
  1026. fStatus := TWorkerStatus.wsSuspended
  1027. end;
  1028. { TScheduledTasks }
  1029. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1030. begin
  1031. Result := AddTask(aTaskName,[],False,aTaskProc);
  1032. end;
  1033. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1034. var
  1035. scheduletask : TScheduledTask;
  1036. begin
  1037. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1038. scheduletask.Name := aTaskName;
  1039. Inc(fNumPushedTasks);
  1040. scheduletask.SetIdTask(fNumPushedTasks);
  1041. fTaskList.Add(scheduletask);
  1042. Result := scheduletask;
  1043. end;
  1044. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1045. begin
  1046. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1047. TTask(Result).ExecuteWithSync := True;
  1048. end;
  1049. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1050. begin
  1051. Result := AddTask_Sync(aTaskName,aTaskProc);
  1052. end;
  1053. constructor TScheduledTasks.Create;
  1054. begin
  1055. fNumPushedTasks := 0;
  1056. fIsStarted := False;
  1057. fTaskList := TScheduledTaskList.Create;
  1058. end;
  1059. destructor TScheduledTasks.Destroy;
  1060. begin
  1061. if Assigned(fScheduler) then
  1062. begin
  1063. fScheduler.Terminate;
  1064. fScheduler.WaitFor;
  1065. fScheduler.Free;
  1066. end;
  1067. if Assigned(fTaskList) then fTaskList.Free;
  1068. inherited;
  1069. end;
  1070. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1071. begin
  1072. Result := fScheduler.Get(aIdTask);
  1073. end;
  1074. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1075. begin
  1076. Result := fScheduler.Get(aTaskName);
  1077. end;
  1078. procedure TScheduledTasks.Start;
  1079. begin
  1080. if fIsStarted then Exit;
  1081. if not Assigned(fScheduler) then
  1082. begin
  1083. fScheduler := TScheduler.Create(fTaskList);
  1084. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1085. end;
  1086. fScheduler.Start;
  1087. fIsStarted := True;
  1088. end;
  1089. procedure TScheduledTasks.Stop;
  1090. begin
  1091. if Assigned(fScheduler) then fScheduler.Terminate;
  1092. fIsStarted := False;
  1093. end;
  1094. { TScheduledTask }
  1095. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1096. begin
  1097. Result := Self;
  1098. ClearSchedule;
  1099. fScheduleMode := TScheduleMode.smRunOnce;
  1100. fStartDate := aStartDate;
  1101. fNextExecution := aStartDate;
  1102. end;
  1103. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1104. begin
  1105. Result := Self;
  1106. ClearSchedule;
  1107. fScheduleMode := TScheduleMode.smRunOnce;
  1108. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1109. fNextExecution := fStartDate;
  1110. end;
  1111. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1112. begin
  1113. Result := Self;
  1114. ClearSchedule;
  1115. fScheduleMode := TScheduleMode.smRunOnce;
  1116. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1117. fNextExecution := fStartDate;
  1118. end;
  1119. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1120. begin
  1121. Result := Self;
  1122. ClearSchedule;
  1123. fScheduleMode := TScheduleMode.smRunOnce;
  1124. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1125. fNextExecution := fStartDate;
  1126. end;
  1127. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1128. begin
  1129. if fStartDate = 0.0 then ClearSchedule;
  1130. fScheduleMode := TScheduleMode.smRepeatMode;
  1131. fTimeMeasure := aTimeMeasure;
  1132. fTimeInterval := aInterval;
  1133. if fStartDate = 0.0 then fStartDate := Now();
  1134. fNextExecution := fStartDate;
  1135. fEnabled := True;
  1136. end;
  1137. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1138. begin
  1139. if fStartDate = 0.0 then ClearSchedule;
  1140. fScheduleMode := TScheduleMode.smRepeatMode;
  1141. fTimeMeasure := aTimeMeasure;
  1142. fTimeInterval := aInterval;
  1143. if fStartDate = 0.0 then fStartDate := Now();
  1144. fExpirationDate := aEndTime;
  1145. fNextExecution := fStartDate;
  1146. fEnabled := True;
  1147. end;
  1148. procedure TScheduledTask.RepeatEveryDay;
  1149. begin
  1150. RepeatEvery(1,tmDays);
  1151. end;
  1152. procedure TScheduledTask.RepeatEveryWeek;
  1153. begin
  1154. RepeatEvery(7,tmDays);
  1155. end;
  1156. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1157. begin
  1158. if fStartDate = 0.0 then ClearSchedule;
  1159. fScheduleMode := TScheduleMode.smRepeatMode;
  1160. fTimeMeasure := aTimeMeasure;
  1161. fTimeInterval := aInterval;
  1162. if fStartDate = 0.0 then fStartDate := Now();
  1163. fExpirationTimes := aRepeatTimes;
  1164. fNextExecution := fStartDate;
  1165. fEnabled := True;
  1166. end;
  1167. procedure TScheduledTask.RunOnce;
  1168. begin
  1169. fScheduleMode := TScheduleMode.smRunOnce;
  1170. if fStartDate = 0.0 then fStartDate := Now();
  1171. fNextExecution := fStartDate;
  1172. fEnabled := True;
  1173. end;
  1174. procedure TScheduledTask.Cancel;
  1175. begin
  1176. fFinished := True;
  1177. end;
  1178. function TScheduledTask.CheckSchedule: Boolean;
  1179. begin
  1180. Result := False;
  1181. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1182. if fScheduleMode = TScheduleMode.smRunOnce then
  1183. begin
  1184. //if start date reached
  1185. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1186. begin
  1187. fLastExecution := Now();
  1188. Inc(fExecutionTimes);
  1189. fFinished := True;
  1190. Result := True;
  1191. end;
  1192. end
  1193. else
  1194. begin
  1195. //if next execution reached
  1196. if Now() >= fNextExecution then
  1197. begin
  1198. //check expiration limits
  1199. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1200. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1201. begin
  1202. fFinished := True;
  1203. Exit;
  1204. end;
  1205. //calculate next execution
  1206. case fTimeMeasure of
  1207. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1208. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1209. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1210. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1211. end;
  1212. fLastExecution := Now();
  1213. Inc(fExecutionTimes);
  1214. Result := True;
  1215. end;
  1216. end;
  1217. end;
  1218. procedure TScheduledTask.ClearSchedule;
  1219. begin
  1220. inherited;
  1221. fFinished := False;
  1222. fStartDate := 0.0;
  1223. fLastExecution := 0.0;
  1224. fNextExecution := 0.0;
  1225. fExpirationDate := 0.0;
  1226. fScheduleMode := TScheduleMode.smRunOnce;
  1227. fTimeMeasure := TTimeMeasure.tmSeconds;
  1228. fTimeInterval := 0;
  1229. end;
  1230. procedure TScheduledTask.DoExpire;
  1231. begin
  1232. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1233. fEnabled := False;
  1234. end;
  1235. function TScheduledTask.GetTaskName: string;
  1236. begin
  1237. Result := fName;
  1238. end;
  1239. function TScheduledTask.IsFinished: Boolean;
  1240. begin
  1241. Result := fFinished;
  1242. end;
  1243. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1244. begin
  1245. fExceptProc := aTaskProc;
  1246. Result := Self;
  1247. end;
  1248. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1249. begin
  1250. Result := OnException(aTaskProc);
  1251. TTask(Result).ExceptionWithSync := True;
  1252. end;
  1253. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1254. begin
  1255. fExpiredProc := aTaskProc;
  1256. Result := Self;
  1257. end;
  1258. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1259. begin
  1260. Result := OnExpired(aTaskProc);
  1261. TScheduledTask(Result).ExpireWithSync := True;
  1262. end;
  1263. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1264. begin
  1265. fTerminateProc := aTaskProc;
  1266. Result := Self;
  1267. end;
  1268. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1269. begin
  1270. Result := OnTerminated(aTaskProc);
  1271. TTask(Result).TerminateWithSync := True;
  1272. end;
  1273. { TScheduledWorker }
  1274. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1275. begin
  1276. inherited Create(aNumWorker,nil);
  1277. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1278. FreeOnTerminate := True;
  1279. fCurrentTask := aScheduledTask;
  1280. end;
  1281. procedure TScheduledWorker.Execute;
  1282. begin
  1283. fStatus := TWorkerStatus.wsIdle;
  1284. if Assigned(fCurrentTask) then
  1285. begin
  1286. try
  1287. fStatus := TWorkerStatus.wsWorking;
  1288. try
  1289. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1290. else fCurrentTask.DoExecute;
  1291. fStatus := TWorkerStatus.wsIdle;
  1292. except
  1293. on E : Exception do
  1294. begin
  1295. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1296. else raise Exception.Create(e.Message);
  1297. end;
  1298. end;
  1299. finally
  1300. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1301. else fCurrentTask.DoTerminate;
  1302. //check if expired
  1303. if (fCurrentTask as IScheduledTask).IsFinished then
  1304. begin
  1305. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1306. else (fCurrentTask as IScheduledTask).DoExpire;
  1307. end;
  1308. end;
  1309. end;
  1310. fCurrentTask := nil;
  1311. fStatus := TWorkerStatus.wsSuspended;
  1312. end;
  1313. procedure TScheduledWorker.ExpireTask;
  1314. begin
  1315. (fCurrentTask as IScheduledTask).DoExpire;
  1316. end;
  1317. { TScheduler }
  1318. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1319. begin
  1320. inherited Create(True);
  1321. FreeOnTerminate := False;
  1322. fListLock := TCriticalSection.Create;
  1323. fRemoveTaskAfterExpiration := False;
  1324. fTaskList := aTaskList;
  1325. {$IFDEF FPC}
  1326. fCondVar := TSimpleEvent.Create;
  1327. {$ELSE}
  1328. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1329. {$ENDIF}
  1330. end;
  1331. destructor TScheduler.Destroy;
  1332. begin
  1333. fCondVar.SetEvent;
  1334. fCondVar.Free;
  1335. fTaskList := nil;
  1336. fListLock.Free;
  1337. inherited;
  1338. end;
  1339. procedure TScheduler.Execute;
  1340. var
  1341. task : IScheduledTask;
  1342. worker : TScheduledWorker;
  1343. numworker : Int64;
  1344. begin
  1345. numworker := 0;
  1346. while not Terminated do
  1347. begin
  1348. fListLock.Enter;
  1349. try
  1350. for task in fTaskList do
  1351. begin
  1352. if (task.IsEnabled) and (not task.IsFinished) then
  1353. begin
  1354. if task.CheckSchedule then
  1355. begin
  1356. Inc(numworker);
  1357. worker := TScheduledWorker.Create(numworker,task);
  1358. worker.Start;
  1359. end;
  1360. end
  1361. else
  1362. begin
  1363. if task.IsEnabled then
  1364. begin
  1365. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1366. // else task.DoExpire;
  1367. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1368. end;
  1369. end;
  1370. end;
  1371. task := nil;
  1372. finally
  1373. fListLock.Leave;
  1374. end;
  1375. fCondVar.WaitFor(250);
  1376. end;
  1377. end;
  1378. procedure TScheduler.ExpireTask;
  1379. begin
  1380. end;
  1381. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1382. begin
  1383. Result := fTaskList.Add(aTask);
  1384. end;
  1385. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1386. var
  1387. task : IScheduledTask;
  1388. begin
  1389. fListLock.Enter;
  1390. try
  1391. for task in fTaskList do
  1392. begin
  1393. if task.IdTask = aIdTask then Exit(task);
  1394. end;
  1395. finally
  1396. fListLock.Leave;
  1397. end;
  1398. end;
  1399. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1400. var
  1401. task : IScheduledTask;
  1402. begin
  1403. fListLock.Enter;
  1404. try
  1405. for task in fTaskList do
  1406. begin
  1407. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1408. end;
  1409. finally
  1410. fListLock.Leave;
  1411. end;
  1412. end;
  1413. { TAdvThread }
  1414. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1415. begin
  1416. inherited Create(True);
  1417. FreeOnTerminate := True;
  1418. fExecuteWithSync := aSynchronize;
  1419. fExecuteProc := aProc;
  1420. end;
  1421. procedure TAdvThread.DoExecute;
  1422. begin
  1423. if Assigned(fExecuteProc) then fExecuteProc;
  1424. end;
  1425. procedure TAdvThread.CallToTerminate;
  1426. begin
  1427. if Assigned(fTerminateProc) then fTerminateProc;
  1428. end;
  1429. procedure TAdvThread.DoTerminate;
  1430. begin
  1431. if fTerminateWithSync then Synchronize(CallToTerminate)
  1432. else CallToTerminate;
  1433. end;
  1434. procedure TAdvThread.Execute;
  1435. begin
  1436. if fExecuteWithSync then Synchronize(DoExecute)
  1437. else DoExecute;
  1438. end;
  1439. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1440. begin
  1441. fTerminateWithSync := aSynchronize;
  1442. fTerminateProc := aProc;
  1443. end;
  1444. end.