Quick.Threads.pas 45 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 31/07/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. Quick.Commons,
  31. //Quick.Chrono,
  32. Quick.Value,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFDEF FPC}
  126. TProc = procedure of object;
  127. {$ENDIF}
  128. TAdvThread = class(TThread)
  129. private
  130. fExecuteProc : TProc;
  131. fTerminateProc : TProc;
  132. fExecuteWithSync : Boolean;
  133. fTerminateWithSync : Boolean;
  134. procedure DoExecute;
  135. procedure CallToTerminate;
  136. protected
  137. procedure DoTerminate; override;
  138. public
  139. constructor Create(aProc : TProc; aSynchronize : Boolean);
  140. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  141. procedure Execute; override;
  142. end;
  143. IAnonymousThread = interface
  144. procedure Start;
  145. function OnTerminate(aProc : TProc) : IAnonymousThread;
  146. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  147. end;
  148. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  149. private
  150. fThread : TAdvThread;
  151. constructor Create(aProc : TProc; aSynchronize : Boolean);
  152. public
  153. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  154. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  155. procedure Start;
  156. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  157. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  158. end;
  159. TParamArray = array of TFlexValue;
  160. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  161. TScheduleMode = (smRunOnce, smRepeatMode);
  162. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  163. ITask = interface
  164. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  165. function GetParam(aIndex : Integer) : TFlexValue;
  166. function TaskStatus : TWorkTaskStatus;
  167. function GetNumWorker : Integer;
  168. procedure SetNumWorker(Value : Integer);
  169. function GetIdTask : Int64;
  170. procedure SetIdTask(Value : Int64);
  171. procedure DoExecute;
  172. procedure DoException(aException : Exception);
  173. procedure DoTerminate;
  174. property Param[index : Integer] : TFlexValue read GetParam;
  175. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  176. property IdTask : Int64 read GetIdTask;
  177. function IsEnabled : Boolean;
  178. end;
  179. {$IFNDEF FPC}
  180. TTaskProc = reference to procedure(task : ITask);
  181. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  182. {$ELSE}
  183. TTaskProc = procedure(task : ITask) of object;
  184. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  185. {$ENDIF}
  186. IWorkTask = interface(ITask)
  187. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  188. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  189. procedure Run;
  190. end;
  191. IScheduledTask = interface(ITask)
  192. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  193. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  194. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  195. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  196. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  197. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  198. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  199. function CheckSchedule : Boolean;
  200. procedure DoExpire;
  201. function GetTaskName : string;
  202. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  203. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  204. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  205. function StartOnDayChange : IScheduledTask;
  206. function StartNow : IScheduledTask;
  207. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  208. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  209. procedure RunOnce;
  210. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  211. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  212. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  213. procedure RepeatEveryDay;
  214. procedure RepeatEveryWeek;
  215. function IsFinished : Boolean;
  216. procedure Cancel;
  217. property Name : string read GetTaskName;
  218. end;
  219. TTask = class(TInterfacedObject,ITask)
  220. private
  221. fIdTask : Int64;
  222. fNumWorker : Integer;
  223. fParamArray : TParamArray;
  224. fExecuteProc : TTaskProc;
  225. fExceptProc : TTaskExceptionProc;
  226. fTerminateProc : TTaskProc;
  227. fExpiredProc : TTaskProc;
  228. fTaskStatus : TWorkTaskStatus;
  229. fOwnedParams : Boolean;
  230. fEnabled : Boolean;
  231. fExecuteWithSync : Boolean;
  232. fExceptionWithSync : Boolean;
  233. fTerminateWithSync : Boolean;
  234. function GetParam(aIndex : Integer) : TFlexValue;
  235. procedure DoExecute;
  236. procedure DoException(aException : Exception);
  237. procedure DoTerminate;
  238. function GetNumWorker : Integer;
  239. procedure SetNumWorker(Value : Integer);
  240. function GetIdTask : Int64;
  241. procedure SetIdTask(Value : Int64);
  242. protected
  243. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  244. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  245. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  246. public
  247. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  248. destructor Destroy; override;
  249. property IdTask : Int64 read GetIdTask;
  250. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  251. function IsEnabled : Boolean;
  252. function TaskStatus : TWorkTaskStatus;
  253. end;
  254. TWorkTask = class(TTask,IWorkTask)
  255. public
  256. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  257. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  258. procedure Run; virtual;
  259. end;
  260. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  261. TScheduledTask = class(TTask,IScheduledTask)
  262. private
  263. fName : string;
  264. fcurrentschedule : TPair<Integer, TTimeMeasure>;
  265. fExecutionTimes : Integer;
  266. fScheduleMode : TScheduleMode;
  267. fTimeInterval : Integer;
  268. fTimeMeasure : TTimeMeasure;
  269. fStartDate : TDateTime;
  270. fLastExecution : TDateTime;
  271. fNextExecution : TDateTime;
  272. fExpirationDate : TDateTime;
  273. fExpirationTimes : Integer;
  274. fFinished : Boolean;
  275. fExpireWithSync: Boolean;
  276. procedure ClearSchedule;
  277. function CheckSchedule : Boolean;
  278. procedure DoExpire;
  279. function GetTaskName : string;
  280. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  281. protected
  282. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  283. public
  284. property Name : string read fName write fName;
  285. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  286. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  287. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  288. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  289. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  290. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  291. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  292. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  293. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  294. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  295. function StartOnDayChange : IScheduledTask;
  296. function StartNow : IScheduledTask;
  297. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  298. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  299. procedure RunOnce;
  300. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  301. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  302. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  303. procedure RepeatEveryDay;
  304. procedure RepeatEveryWeek;
  305. function IsFinished : Boolean;
  306. procedure Cancel;
  307. end;
  308. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  309. TWorker = class(TThread)
  310. private
  311. fNumWorker : Integer;
  312. fCurrentIdTask : Integer;
  313. fStatus : TWorkerStatus;
  314. fTaskQueue : TTaskQueue;
  315. procedure ExecuteTask;
  316. procedure TerminateTask;
  317. protected
  318. fCurrentTask : ITask;
  319. public
  320. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  321. property NumWorker : Integer read fNumWorker;
  322. property Status : TWorkerStatus read fStatus;
  323. procedure Execute; override;
  324. end;
  325. TScheduledWorker = class(TWorker)
  326. private
  327. procedure ExpireTask;
  328. public
  329. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  330. procedure Execute; override;
  331. end;
  332. TWorkerPool = TObjectList<TWorker>;
  333. TBackgroundTasks = class
  334. private
  335. fMaxQueue : Integer;
  336. fWorkerPool : TWorkerPool;
  337. fConcurrentWorkers : Integer;
  338. fInsertTimeout : Cardinal;
  339. fExtractTimeout : Cardinal;
  340. fTaskQueue : TTaskQueue;
  341. fNumPushedTasks : Int64;
  342. function GetTaskQueue : Cardinal;
  343. public
  344. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  345. destructor Destroy; override;
  346. property MaxQueue : Integer read fMaxQueue;
  347. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  348. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  349. property TaskQueued : Cardinal read GetTaskQueue;
  350. property NumPushedTasks : Int64 read fNumPushedTasks;
  351. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  352. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  353. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  354. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  355. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  356. procedure Start;
  357. procedure CancelAll;
  358. end;
  359. TScheduledTaskList = TList<IScheduledTask>;
  360. TScheduler = class(TThread)
  361. private
  362. fListLock : TCriticalSection;
  363. fCondVar : TSimpleEvent;
  364. fTaskList : TScheduledTaskList;
  365. fRemoveTaskAfterExpiration : Boolean;
  366. public
  367. constructor Create(aTaskList : TScheduledTaskList);
  368. destructor Destroy; override;
  369. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  370. procedure Execute; override;
  371. function Add(aTask : TScheduledTask) : Integer;
  372. function Get(aIdTask : Int64) : IScheduledTask; overload;
  373. function Get(const aTaskName : string) : IScheduledTask; overload;
  374. end;
  375. TScheduledTasks = class
  376. private
  377. fTaskList : TScheduledTaskList;
  378. fScheduler : TScheduler;
  379. fNumPushedTasks : Int64;
  380. fRemoveTaskAfterExpiration : Boolean;
  381. fIsStarted : Boolean;
  382. public
  383. constructor Create;
  384. destructor Destroy; override;
  385. property NumPushedTasks : Int64 read fNumPushedTasks;
  386. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  387. property IsStarted : Boolean read fIsStarted;
  388. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  389. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  390. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  391. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  392. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  393. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  394. procedure Start;
  395. procedure Stop;
  396. end;
  397. implementation
  398. { TThreadedQueueCS<T> }
  399. procedure TThreadedQueueCS<T>.Clear;
  400. var
  401. obj : T;
  402. begin
  403. FQueueLock.Enter;
  404. try
  405. for obj in FQueue do
  406. begin
  407. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  408. end;
  409. SetLength(FQueue,0);
  410. finally
  411. FQueueLock.Leave;
  412. end;
  413. {$IFDEF FPC}
  414. FQueueCondVar.SetEvent;
  415. {$ELSE}
  416. FQueueCondVar.ReleaseAll;
  417. {$ENDIF}
  418. end;
  419. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  420. begin
  421. inherited Create;
  422. SetLength(FQueue, AQueueDepth);
  423. FQueueLock := TCriticalSection.Create;
  424. {$IFDEF FPC}
  425. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  426. {$ELSE}
  427. FQueueCondVar := TConditionVariableCS.Create;
  428. {$ENDIF}
  429. FPushTimeout := PushTimeout;
  430. FPopTimeout := PopTimeout;
  431. end;
  432. destructor TThreadedQueueCS<T>.Destroy;
  433. begin
  434. DoShutDown;
  435. FQueueLock.Free;
  436. FQueueCondVar.Free;
  437. inherited;
  438. end;
  439. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  440. begin
  441. FQueueLock.Enter;
  442. try
  443. SetLength(FQueue, Length(FQueue) + ADelta);
  444. finally
  445. FQueueLock.Leave;
  446. end;
  447. {$IFDEF FPC}
  448. FQueueCondVar.SetEvent;
  449. {$ELSE}
  450. FQueueCondVar.ReleaseAll;
  451. {$ENDIF}
  452. end;
  453. function TThreadedQueueCS<T>.PopItem: T;
  454. var
  455. LQueueSize: Integer;
  456. begin
  457. PopItem(LQueueSize, Result);
  458. end;
  459. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  460. begin
  461. AItem := Default(T);
  462. FQueueLock.Enter;
  463. try
  464. Result := wrSignaled;
  465. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  466. begin
  467. {$IFDEF FPC}
  468. Result := FQueueCondVar.WaitFor(FPopTimeout);
  469. {$ELSE}
  470. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  471. {$ENDIF}
  472. end;
  473. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  474. AItem := FQueue[FQueueOffset];
  475. FQueue[FQueueOffset] := Default(T);
  476. if FQueueSize = Length(FQueue) then
  477. begin
  478. {$IFDEF FPC}
  479. FQueueCondVar.SetEvent;
  480. {$ELSE}
  481. FQueueCondVar.ReleaseAll;
  482. {$ENDIF}
  483. end;
  484. Dec(FQueueSize);
  485. Inc(FQueueOffset);
  486. Inc(FTotalItemsPopped);
  487. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  488. finally
  489. AQueueSize := FQueueSize;
  490. FQueueLock.Leave;
  491. end;
  492. end;
  493. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  494. var
  495. LQueueSize: Integer;
  496. begin
  497. Result := PopItem(LQueueSize, AItem);
  498. end;
  499. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  500. begin
  501. PopItem(AQueueSize, Result);
  502. end;
  503. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  504. var
  505. LQueueSize: Integer;
  506. begin
  507. Result := PushItem(AItem, LQueueSize);
  508. end;
  509. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  510. begin
  511. FQueueLock.Enter;
  512. try
  513. Result := wrSignaled;
  514. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  515. begin
  516. {$IFDEF FPC}
  517. Result := FQueueCondVar.WaitFor(FPushTimeout);
  518. {$ELSE}
  519. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  520. {$ENDIF}
  521. end;
  522. if FShutDown or (Result <> wrSignaled) then Exit;
  523. if FQueueSize = 0 then
  524. begin
  525. {$IFDEF FPC}
  526. FQueueCondVar.SetEvent;
  527. {$ELSE}
  528. FQueueCondVar.ReleaseAll;
  529. {$ENDIF}
  530. end;
  531. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  532. Inc(FQueueSize);
  533. Inc(FTotalItemsPushed);
  534. finally
  535. AQueueSize := FQueueSize;
  536. FQueueLock.Leave;
  537. end;
  538. end;
  539. procedure TThreadedQueueCS<T>.DoShutDown;
  540. begin
  541. FShutDown := True;
  542. {$IFDEF FPC}
  543. FQueueCondVar.SetEvent;
  544. {$ELSE}
  545. FQueueCondVar.ReleaseAll;
  546. {$ENDIF}
  547. end;
  548. { TThreadedQueueList<T> }
  549. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  550. begin
  551. inherited Create;
  552. fQueue := TQueue<T>.Create;
  553. fQueue.Capacity := AQueueDepth;
  554. fQueueSize := 0;
  555. fQueueLock := TCriticalSection.Create;
  556. {$IFDEF FPC}
  557. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  558. {$ELSE}
  559. fQueueCondVar := TConditionVariableCS.Create;
  560. {$ENDIF}
  561. fPushTimeout := PushTimeout;
  562. fPopTimeout := PopTimeout;
  563. end;
  564. destructor TThreadedQueueList<T>.Destroy;
  565. begin
  566. DoShutDown;
  567. fQueueLock.Free;
  568. fQueueCondVar.Free;
  569. fQueue.Free;
  570. inherited;
  571. end;
  572. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  573. begin
  574. fQueueLock.Enter;
  575. try
  576. fQueue.Capacity := fQueue.Capacity + ADelta;
  577. finally
  578. fQueueLock.Leave;
  579. end;
  580. {$IFDEF FPC}
  581. FQueueCondVar.SetEvent;
  582. {$ELSE}
  583. FQueueCondVar.ReleaseAll;
  584. {$ENDIF}
  585. end;
  586. function TThreadedQueueList<T>.PopItem: T;
  587. var
  588. LQueueSize: Integer;
  589. begin
  590. PopItem(LQueueSize, Result);
  591. end;
  592. {$IFDEF FPC}
  593. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  594. //var
  595. //crono : TChronometer;
  596. begin
  597. AItem := Default(T);
  598. //crono := TChronometer.Create(False);
  599. try
  600. Result := wrSignaled;
  601. //writeln('popitem');
  602. //crono.Start;
  603. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  604. begin
  605. //crono.Start;
  606. Result := FQueueCondVar.WaitFor(FPopTimeout);
  607. //crono.Stop;
  608. //writeln('in: ' + crono.ElapsedTime);
  609. //if result = twaitresult.wrError then result := twaitresult.wrError;
  610. end;
  611. //crono.Stop;
  612. //writeln('out: ' + crono.ElapsedTime);
  613. fQueueLock.Enter;
  614. try
  615. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  616. AItem := fQueue.Extract;
  617. Dec(FQueueSize);
  618. Inc(fTotalItemsPopped);
  619. finally
  620. fQueueLock.Leave;
  621. end;
  622. finally
  623. AQueueSize := fQueueSize;
  624. end;
  625. end;
  626. {$ELSE}
  627. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  628. begin
  629. AItem := Default(T);
  630. fQueueLock.Enter;
  631. try
  632. Result := wrSignaled;
  633. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  634. begin
  635. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  636. end;
  637. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  638. AItem := fQueue.Extract;
  639. if fQueueSize = fQueue.Count then
  640. begin
  641. FQueueCondVar.ReleaseAll;
  642. end;
  643. Dec(FQueueSize);
  644. Inc(fTotalItemsPopped);
  645. finally
  646. AQueueSize := fQueueSize;
  647. fQueueLock.Leave;
  648. end;
  649. end;
  650. {$ENDIF}
  651. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  652. var
  653. LQueueSize: Integer;
  654. begin
  655. Result := PopItem(LQueueSize, AItem);
  656. end;
  657. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  658. begin
  659. PopItem(AQueueSize, Result);
  660. end;
  661. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  662. var
  663. LQueueSize: Integer;
  664. begin
  665. Result := PushItem(AItem, LQueueSize);
  666. end;
  667. {$IFDEF FPC}
  668. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  669. begin
  670. FQueueLock.Enter;
  671. try
  672. Result := wrSignaled;
  673. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  674. //begin
  675. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  676. //end;
  677. if fShutDown or (Result <> wrSignaled) then Exit;
  678. //if fQueueSize = 0 then
  679. //begin
  680. // FQueueCondVar.SetEvent;
  681. //end;
  682. fQueue.Enqueue(AItem);
  683. Inc(FQueueSize);
  684. Inc(fTotalItemsPushed);
  685. finally
  686. AQueueSize := fQueueSize;
  687. FQueueLock.Leave;
  688. //FQueueCondVar.SetEvent;
  689. end;
  690. end;
  691. {$ELSE}
  692. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  693. begin
  694. FQueueLock.Enter;
  695. try
  696. Result := wrSignaled;
  697. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  698. //begin
  699. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  700. //end;
  701. if fShutDown or (Result <> wrSignaled) then Exit;
  702. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  703. fQueue.Enqueue(AItem);
  704. Inc(FQueueSize);
  705. Inc(fTotalItemsPushed);
  706. finally
  707. AQueueSize := fQueueSize;
  708. FQueueLock.Leave;
  709. end;
  710. end;
  711. {$ENDIF}
  712. procedure TThreadedQueueList<T>.DoShutDown;
  713. begin
  714. fShutDown := True;
  715. {$IFDEF FPC}
  716. FQueueCondVar.SetEvent;
  717. {$ELSE}
  718. FQueueCondVar.ReleaseAll;
  719. {$ENDIF}
  720. end;
  721. {$IFNDEF FPC}
  722. { TThreadObjectList<T> }
  723. procedure TThreadObjectList<T>.Add(const Item: T);
  724. begin
  725. LockList;
  726. try
  727. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  728. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  729. finally
  730. UnlockList;
  731. end;
  732. end;
  733. procedure TThreadObjectList<T>.Clear;
  734. begin
  735. LockList;
  736. try
  737. fList.Clear;
  738. finally
  739. UnlockList;
  740. end;
  741. end;
  742. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  743. begin
  744. inherited Create;
  745. fLock := TObject.Create;
  746. fList := TObjectList<T>.Create;
  747. fDuplicates := dupIgnore;
  748. end;
  749. destructor TThreadObjectList<T>.Destroy;
  750. begin
  751. LockList;
  752. try
  753. fList.Free;
  754. inherited Destroy;
  755. finally
  756. UnlockList;
  757. fLock.Free;
  758. end;
  759. end;
  760. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  761. begin
  762. LockList;
  763. try
  764. Result := fList[aIndex];
  765. finally
  766. UnlockList;
  767. end;
  768. end;
  769. function TThreadObjectList<T>.LockList: TObjectList<T>;
  770. begin
  771. System.TMonitor.Enter(fLock);
  772. Result := fList;
  773. end;
  774. procedure TThreadObjectList<T>.Remove(const Item: T);
  775. begin
  776. RemoveItem(Item, TDirection.FromBeginning);
  777. end;
  778. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  779. begin
  780. LockList;
  781. try
  782. fList.RemoveItem(Item, Direction);
  783. finally
  784. UnlockList;
  785. end;
  786. end;
  787. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  788. begin
  789. LockList;
  790. try
  791. fList[aIndex] := aValue;
  792. finally
  793. UnlockList;
  794. end;
  795. end;
  796. procedure TThreadObjectList<T>.UnlockList;
  797. begin
  798. System.TMonitor.Exit(fLock);
  799. end;
  800. {$ENDIF}
  801. { TAnonymousThread }
  802. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  803. begin
  804. fThread := TAdvThread.Create(aProc,aSynchronize);
  805. end;
  806. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  807. begin
  808. Result := TAnonymousThread.Create(aProc,False);
  809. end;
  810. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  811. begin
  812. Result := TAnonymousThread.Create(aProc,True);
  813. end;
  814. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  815. begin
  816. Result := Self;
  817. fThread.OnTerminate(aProc,False);
  818. end;
  819. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  820. begin
  821. Result := Self;
  822. fThread.OnTerminate(aProc,True);
  823. end;
  824. procedure TAnonymousThread.Start;
  825. begin
  826. fThread.Start;
  827. end;
  828. { TTask }
  829. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  830. var
  831. i : Integer;
  832. begin
  833. fTaskStatus := TWorkTaskStatus.wtsPending;
  834. fExecuteWithSync := False;
  835. fTerminateWithSync := False;
  836. fExceptionWithSync := False;
  837. fOwnedParams := aOwnedParams;
  838. SetLength(fParamArray,High(aParamArray)+1);
  839. for i := Low(aParamArray) to High(aParamArray) do
  840. begin
  841. fParamArray[i].Create(aParamArray[i]);
  842. {$IFDEF FPC}
  843. fParamArray[i]._AddRef;
  844. {$ENDIF}
  845. end;
  846. fExecuteProc := aTaskProc;
  847. fEnabled := False;
  848. end;
  849. destructor TTask.Destroy;
  850. var
  851. i : Integer;
  852. begin
  853. //free passed params
  854. if fOwnedParams then
  855. begin
  856. for i := Low(fParamArray) to High(fParamArray) do
  857. begin
  858. {$IFDEF FPC}
  859. fParamArray[i]._Release;
  860. {$ENDIF}
  861. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  862. end;
  863. end;
  864. fParamArray := nil;
  865. inherited;
  866. end;
  867. procedure TTask.DoException(aException : Exception);
  868. begin
  869. fTaskStatus := TWorkTaskStatus.wtsException;
  870. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  871. else raise aException;
  872. end;
  873. procedure TTask.DoExecute;
  874. begin
  875. fTaskStatus := TWorkTaskStatus.wtsRunning;
  876. if Assigned(fExecuteProc) then fExecuteProc(Self);
  877. fTaskStatus := TWorkTaskStatus.wtsDone;
  878. end;
  879. procedure TTask.DoTerminate;
  880. begin
  881. if Assigned(fTerminateProc) then fTerminateProc(Self);
  882. end;
  883. function TTask.GetIdTask: Int64;
  884. begin
  885. Result := fIdTask;
  886. end;
  887. procedure TTask.SetIdTask(Value : Int64);
  888. begin
  889. fIdTask := Value;
  890. end;
  891. function TTask.GetNumWorker: Integer;
  892. begin
  893. Result := fNumWorker;
  894. end;
  895. function TTask.GetParam(aIndex: Integer): TFlexValue;
  896. begin
  897. Result := fParamArray[aIndex];
  898. end;
  899. function TTask.IsEnabled: Boolean;
  900. begin
  901. Result := fEnabled;
  902. end;
  903. procedure TTask.SetNumWorker(Value: Integer);
  904. begin
  905. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  906. fNumWorker := Value;
  907. end;
  908. function TTask.TaskStatus: TWorkTaskStatus;
  909. begin
  910. Result := fTaskStatus;
  911. end;
  912. { TWorkTask }
  913. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  914. begin
  915. fExceptProc := aTaskProc;
  916. Result := Self;
  917. end;
  918. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  919. begin
  920. fTerminateProc := aTaskProc;
  921. Result := Self;
  922. end;
  923. procedure TWorkTask.Run;
  924. begin
  925. fEnabled := True;
  926. end;
  927. { TBackgroundTasks }
  928. procedure TBackgroundTasks.CancelAll;
  929. begin
  930. fTaskQueue.Clear;
  931. end;
  932. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  933. begin
  934. fMaxQueue := aMaxQueue;
  935. fConcurrentWorkers := aConcurrentWorkers;
  936. fInsertTimeout := INFINITE;
  937. fExtractTimeout := 5000;
  938. fNumPushedTasks := 0;
  939. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  940. end;
  941. destructor TBackgroundTasks.Destroy;
  942. begin
  943. if Assigned(fWorkerPool) then fWorkerPool.Free;
  944. if Assigned(fTaskQueue) then fTaskQueue.Free;
  945. inherited;
  946. end;
  947. function TBackgroundTasks.GetTaskQueue: Cardinal;
  948. begin
  949. Result := fTaskQueue.QueueSize;
  950. end;
  951. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  952. begin
  953. Result := AddTask([],False,aTaskProc);
  954. end;
  955. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  956. var
  957. worktask : IWorkTask;
  958. begin
  959. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  960. Inc(fNumPushedTasks);
  961. worktask.SetIdTask(fNumPushedTasks);
  962. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  963. begin
  964. Result := worktask;
  965. end;
  966. end;
  967. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  968. begin
  969. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  970. TTask(Result).ExecuteWithSync := True;
  971. end;
  972. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  973. begin
  974. Result := AddTask_Sync([],False,aTaskProc);
  975. end;
  976. procedure TBackgroundTasks.Start;
  977. var
  978. i : Integer;
  979. worker : TWorker;
  980. begin
  981. //create workers
  982. if fWorkerPool <> nil then fWorkerPool.Free;
  983. fWorkerPool := TWorkerPool.Create(True);
  984. for i := 1 to fConcurrentWorkers do
  985. begin
  986. worker := TWorker.Create(i,fTaskQueue);
  987. fWorkerPool.Add(worker);
  988. worker.Start;
  989. end;
  990. end;
  991. { TWorker }
  992. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  993. begin
  994. inherited Create(True);
  995. fNumWorker := aNumWorker;
  996. fStatus := TWorkerStatus.wsSuspended;
  997. fTaskQueue := aTaskQueue;
  998. FreeOnTerminate := False;
  999. end;
  1000. procedure TWorker.ExecuteTask;
  1001. begin
  1002. fCurrentTask.DoExecute;
  1003. end;
  1004. procedure TWorker.TerminateTask;
  1005. begin
  1006. fCurrentTask.DoTerminate;
  1007. end;
  1008. procedure TWorker.Execute;
  1009. begin
  1010. fStatus := TWorkerStatus.wsIdle;
  1011. while not Terminated do
  1012. begin
  1013. fCurrentTask := fTaskQueue.PopItem;
  1014. if fCurrentTask <> nil then
  1015. try
  1016. fStatus := TWorkerStatus.wsWorking;
  1017. try
  1018. fCurrentIdTask := fCurrentTask.GetIdTask;
  1019. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1020. else fCurrentTask.DoExecute;
  1021. except
  1022. on E : Exception do
  1023. begin
  1024. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1025. else raise Exception.Create(e.Message);
  1026. end;
  1027. end;
  1028. finally
  1029. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1030. else fCurrentTask.DoTerminate;
  1031. fStatus := TWorkerStatus.wsIdle;
  1032. end;
  1033. end;
  1034. fStatus := TWorkerStatus.wsSuspended
  1035. end;
  1036. { TScheduledTasks }
  1037. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1038. begin
  1039. Result := AddTask(aTaskName,[],False,aTaskProc);
  1040. end;
  1041. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1042. var
  1043. scheduletask : TScheduledTask;
  1044. begin
  1045. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1046. scheduletask.Name := aTaskName;
  1047. Inc(fNumPushedTasks);
  1048. scheduletask.SetIdTask(fNumPushedTasks);
  1049. fTaskList.Add(scheduletask);
  1050. Result := scheduletask;
  1051. end;
  1052. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1053. begin
  1054. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1055. TTask(Result).ExecuteWithSync := True;
  1056. end;
  1057. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1058. begin
  1059. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1060. end;
  1061. constructor TScheduledTasks.Create;
  1062. begin
  1063. fNumPushedTasks := 0;
  1064. fIsStarted := False;
  1065. fTaskList := TScheduledTaskList.Create;
  1066. end;
  1067. destructor TScheduledTasks.Destroy;
  1068. begin
  1069. if Assigned(fScheduler) then
  1070. begin
  1071. fScheduler.Terminate;
  1072. fScheduler.WaitFor;
  1073. fScheduler.Free;
  1074. end;
  1075. if Assigned(fTaskList) then fTaskList.Free;
  1076. inherited;
  1077. end;
  1078. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1079. begin
  1080. Result := fScheduler.Get(aIdTask);
  1081. end;
  1082. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1083. begin
  1084. if not Assigned(fScheduler) then raise Exception.Create('Scheduler must be started to get a task!');
  1085. Result := fScheduler.Get(aTaskName);
  1086. end;
  1087. procedure TScheduledTasks.Start;
  1088. begin
  1089. if fIsStarted then Exit;
  1090. if not Assigned(fScheduler) then
  1091. begin
  1092. fScheduler := TScheduler.Create(fTaskList);
  1093. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1094. end;
  1095. fScheduler.Start;
  1096. fIsStarted := True;
  1097. end;
  1098. procedure TScheduledTasks.Stop;
  1099. begin
  1100. if Assigned(fScheduler) then fScheduler.Terminate;
  1101. fIsStarted := False;
  1102. end;
  1103. { TScheduledTask }
  1104. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1105. begin
  1106. Result := Self;
  1107. ClearSchedule;
  1108. fScheduleMode := TScheduleMode.smRunOnce;
  1109. fStartDate := aStartDate;
  1110. fNextExecution := aStartDate;
  1111. end;
  1112. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1113. begin
  1114. Result := Self;
  1115. ClearSchedule;
  1116. fScheduleMode := TScheduleMode.smRunOnce;
  1117. fStartDate := IncMinute(Now(),aMinutes);
  1118. fNextExecution := fStartDate;
  1119. end;
  1120. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1121. begin
  1122. Result := Self;
  1123. ClearSchedule;
  1124. fScheduleMode := TScheduleMode.smRunOnce;
  1125. fStartDate := IncSecond(Now(),aSeconds);
  1126. fNextExecution := fStartDate;
  1127. end;
  1128. function TScheduledTask.StartNow: IScheduledTask;
  1129. begin
  1130. Result := Self;
  1131. ClearSchedule;
  1132. fScheduleMode := TScheduleMode.smRunOnce;
  1133. fStartDate := Now();
  1134. fNextExecution := fStartDate;
  1135. end;
  1136. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1137. begin
  1138. Result := Self;
  1139. ClearSchedule;
  1140. fScheduleMode := TScheduleMode.smRunOnce;
  1141. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1142. fNextExecution := fStartDate;
  1143. end;
  1144. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1145. begin
  1146. Result := Self;
  1147. ClearSchedule;
  1148. fScheduleMode := TScheduleMode.smRunOnce;
  1149. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1150. fNextExecution := fStartDate;
  1151. end;
  1152. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1153. begin
  1154. Result := Self;
  1155. ClearSchedule;
  1156. fScheduleMode := TScheduleMode.smRunOnce;
  1157. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1158. fNextExecution := fStartDate;
  1159. end;
  1160. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1161. begin
  1162. if fStartDate = 0.0 then ClearSchedule;
  1163. fScheduleMode := TScheduleMode.smRepeatMode;
  1164. fTimeMeasure := aTimeMeasure;
  1165. fTimeInterval := aInterval;
  1166. if fStartDate < Now() then fStartDate := Now();
  1167. fNextExecution := fStartDate;
  1168. fEnabled := True;
  1169. end;
  1170. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1171. begin
  1172. if fStartDate = 0.0 then ClearSchedule;
  1173. fScheduleMode := TScheduleMode.smRepeatMode;
  1174. fTimeMeasure := aTimeMeasure;
  1175. fTimeInterval := aInterval;
  1176. if fStartDate < Now() then fStartDate := Now();
  1177. fExpirationDate := aEndTime;
  1178. fNextExecution := fStartDate;
  1179. fEnabled := True;
  1180. end;
  1181. procedure TScheduledTask.RepeatEveryDay;
  1182. begin
  1183. RepeatEvery(1,tmDays);
  1184. end;
  1185. procedure TScheduledTask.RepeatEveryWeek;
  1186. begin
  1187. RepeatEvery(7,tmDays);
  1188. end;
  1189. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1190. begin
  1191. if fStartDate = 0.0 then ClearSchedule;
  1192. fScheduleMode := TScheduleMode.smRepeatMode;
  1193. fTimeMeasure := aTimeMeasure;
  1194. fTimeInterval := aInterval;
  1195. if fStartDate < Now() then fStartDate := Now();
  1196. fExpirationTimes := aRepeatTimes;
  1197. fNextExecution := fStartDate;
  1198. fEnabled := True;
  1199. end;
  1200. procedure TScheduledTask.RunOnce;
  1201. begin
  1202. fScheduleMode := TScheduleMode.smRunOnce;
  1203. if fStartDate < Now() then fStartDate := Now();
  1204. fNextExecution := fStartDate;
  1205. fEnabled := True;
  1206. end;
  1207. procedure TScheduledTask.Cancel;
  1208. begin
  1209. fFinished := True;
  1210. end;
  1211. function TScheduledTask.CheckSchedule: Boolean;
  1212. begin
  1213. Result := False;
  1214. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1215. if fScheduleMode = TScheduleMode.smRunOnce then
  1216. begin
  1217. //if start date reached
  1218. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1219. begin
  1220. fLastExecution := Now();
  1221. Inc(fExecutionTimes);
  1222. fFinished := True;
  1223. Result := True;
  1224. end;
  1225. end
  1226. else
  1227. begin
  1228. //if next execution reached
  1229. if Now() >= fNextExecution then
  1230. begin
  1231. //check expiration limits
  1232. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1233. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1234. begin
  1235. fFinished := True;
  1236. Exit;
  1237. end;
  1238. //calculate next execution
  1239. case fTimeMeasure of
  1240. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1241. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1242. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1243. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1244. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1245. end;
  1246. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1247. else
  1248. begin
  1249. fLastExecution := Now();
  1250. Inc(fExecutionTimes);
  1251. Result := True;
  1252. end;
  1253. end;
  1254. end;
  1255. end;
  1256. procedure TScheduledTask.ClearSchedule;
  1257. begin
  1258. inherited;
  1259. fFinished := False;
  1260. fStartDate := 0.0;
  1261. fLastExecution := 0.0;
  1262. fNextExecution := 0.0;
  1263. fExpirationDate := 0.0;
  1264. fScheduleMode := TScheduleMode.smRunOnce;
  1265. fTimeMeasure := TTimeMeasure.tmSeconds;
  1266. fTimeInterval := 0;
  1267. end;
  1268. procedure TScheduledTask.DoExpire;
  1269. begin
  1270. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1271. fEnabled := False;
  1272. end;
  1273. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1274. begin
  1275. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1276. end;
  1277. function TScheduledTask.GetTaskName: string;
  1278. begin
  1279. Result := fName;
  1280. end;
  1281. function TScheduledTask.IsFinished: Boolean;
  1282. begin
  1283. Result := fFinished;
  1284. end;
  1285. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1286. begin
  1287. fExceptProc := aTaskProc;
  1288. Result := Self;
  1289. end;
  1290. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1291. begin
  1292. Result := OnException(aTaskProc);
  1293. TTask(Result).ExceptionWithSync := True;
  1294. end;
  1295. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1296. begin
  1297. fExpiredProc := aTaskProc;
  1298. Result := Self;
  1299. end;
  1300. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1301. begin
  1302. Result := OnExpired(aTaskProc);
  1303. TScheduledTask(Result).ExpireWithSync := True;
  1304. end;
  1305. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1306. begin
  1307. fTerminateProc := aTaskProc;
  1308. Result := Self;
  1309. end;
  1310. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1311. begin
  1312. Result := OnTerminated(aTaskProc);
  1313. TTask(Result).TerminateWithSync := True;
  1314. end;
  1315. { TScheduledWorker }
  1316. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1317. begin
  1318. inherited Create(aNumWorker,nil);
  1319. {$IFNDEF DELPHILINUX}
  1320. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1321. {$ENDIF}
  1322. FreeOnTerminate := True;
  1323. fCurrentTask := aScheduledTask;
  1324. end;
  1325. procedure TScheduledWorker.Execute;
  1326. begin
  1327. fStatus := TWorkerStatus.wsIdle;
  1328. if Assigned(fCurrentTask) then
  1329. begin
  1330. try
  1331. fStatus := TWorkerStatus.wsWorking;
  1332. try
  1333. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1334. else fCurrentTask.DoExecute;
  1335. fStatus := TWorkerStatus.wsIdle;
  1336. except
  1337. on E : Exception do
  1338. begin
  1339. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1340. else raise Exception.Create(e.Message);
  1341. end;
  1342. end;
  1343. finally
  1344. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1345. else fCurrentTask.DoTerminate;
  1346. //check if expired
  1347. if (fCurrentTask as IScheduledTask).IsFinished then
  1348. begin
  1349. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1350. else (fCurrentTask as IScheduledTask).DoExpire;
  1351. end;
  1352. end;
  1353. end;
  1354. fCurrentTask := nil;
  1355. fStatus := TWorkerStatus.wsSuspended;
  1356. end;
  1357. procedure TScheduledWorker.ExpireTask;
  1358. begin
  1359. (fCurrentTask as IScheduledTask).DoExpire;
  1360. end;
  1361. { TScheduler }
  1362. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1363. begin
  1364. inherited Create(True);
  1365. FreeOnTerminate := False;
  1366. fListLock := TCriticalSection.Create;
  1367. fRemoveTaskAfterExpiration := False;
  1368. fTaskList := aTaskList;
  1369. {$IFDEF FPC}
  1370. fCondVar := TSimpleEvent.Create;
  1371. {$ELSE}
  1372. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1373. {$ENDIF}
  1374. end;
  1375. destructor TScheduler.Destroy;
  1376. begin
  1377. fCondVar.SetEvent;
  1378. fCondVar.Free;
  1379. fTaskList := nil;
  1380. fListLock.Free;
  1381. inherited;
  1382. end;
  1383. procedure TScheduler.Execute;
  1384. var
  1385. task : IScheduledTask;
  1386. worker : TScheduledWorker;
  1387. numworker : Int64;
  1388. begin
  1389. numworker := 0;
  1390. while not Terminated do
  1391. begin
  1392. fListLock.Enter;
  1393. try
  1394. for task in fTaskList do
  1395. begin
  1396. if (task.IsEnabled) and (not task.IsFinished) then
  1397. begin
  1398. if task.CheckSchedule then
  1399. begin
  1400. Inc(numworker);
  1401. worker := TScheduledWorker.Create(numworker,task);
  1402. worker.Start;
  1403. end;
  1404. end
  1405. else
  1406. begin
  1407. if task.IsEnabled then
  1408. begin
  1409. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1410. // else task.DoExpire;
  1411. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1412. end;
  1413. end;
  1414. end;
  1415. task := nil;
  1416. finally
  1417. fListLock.Leave;
  1418. end;
  1419. fCondVar.WaitFor(250);
  1420. end;
  1421. end;
  1422. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1423. begin
  1424. Result := fTaskList.Add(aTask);
  1425. end;
  1426. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1427. var
  1428. task : IScheduledTask;
  1429. begin
  1430. fListLock.Enter;
  1431. try
  1432. for task in fTaskList do
  1433. begin
  1434. if task.IdTask = aIdTask then Exit(task);
  1435. end;
  1436. finally
  1437. fListLock.Leave;
  1438. end;
  1439. end;
  1440. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1441. var
  1442. task : IScheduledTask;
  1443. begin
  1444. fListLock.Enter;
  1445. try
  1446. for task in fTaskList do
  1447. begin
  1448. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1449. end;
  1450. finally
  1451. fListLock.Leave;
  1452. end;
  1453. end;
  1454. { TAdvThread }
  1455. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1456. begin
  1457. inherited Create(True);
  1458. FreeOnTerminate := True;
  1459. fExecuteWithSync := aSynchronize;
  1460. fExecuteProc := aProc;
  1461. end;
  1462. procedure TAdvThread.DoExecute;
  1463. begin
  1464. if Assigned(fExecuteProc) then fExecuteProc;
  1465. end;
  1466. procedure TAdvThread.CallToTerminate;
  1467. begin
  1468. if Assigned(fTerminateProc) then fTerminateProc;
  1469. end;
  1470. procedure TAdvThread.DoTerminate;
  1471. begin
  1472. if fTerminateWithSync then Synchronize(CallToTerminate)
  1473. else CallToTerminate;
  1474. end;
  1475. procedure TAdvThread.Execute;
  1476. begin
  1477. if fExecuteWithSync then Synchronize(DoExecute)
  1478. else DoExecute;
  1479. end;
  1480. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1481. begin
  1482. fTerminateWithSync := aSynchronize;
  1483. fTerminateProc := aProc;
  1484. end;
  1485. end.