Quick.Threads.pas 44 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 28/02/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. Quick.Commons,
  31. //Quick.Chrono,
  32. Quick.Value,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFDEF FPC}
  126. TProc = procedure of object;
  127. {$ENDIF}
  128. TAdvThread = class(TThread)
  129. private
  130. fExecuteProc : TProc;
  131. fTerminateProc : TProc;
  132. fExecuteWithSync : Boolean;
  133. fTerminateWithSync : Boolean;
  134. procedure DoExecute;
  135. procedure CallToTerminate;
  136. protected
  137. procedure DoTerminate; override;
  138. public
  139. constructor Create(aProc : TProc; aSynchronize : Boolean);
  140. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  141. procedure Execute; override;
  142. end;
  143. IAnonymousThread = interface
  144. procedure Start;
  145. function OnTerminate(aProc : TProc) : IAnonymousThread;
  146. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  147. end;
  148. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  149. private
  150. fThread : TAdvThread;
  151. constructor Create(aProc : TProc; aSynchronize : Boolean);
  152. public
  153. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  154. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  155. procedure Start;
  156. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  157. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  158. end;
  159. TParamArray = array of TFlexValue;
  160. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  161. TScheduleMode = (smRunOnce, smRepeatMode);
  162. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
  163. ITask = interface
  164. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  165. function GetParam(aIndex : Integer) : TFlexValue;
  166. function TaskStatus : TWorkTaskStatus;
  167. function GetNumWorker : Integer;
  168. procedure SetNumWorker(Value : Integer);
  169. function GetIdTask : Int64;
  170. procedure SetIdTask(Value : Int64);
  171. procedure DoExecute;
  172. procedure DoException(aException : Exception);
  173. procedure DoTerminate;
  174. property Param[index : Integer] : TFlexValue read GetParam;
  175. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  176. property IdTask : Int64 read GetIdTask;
  177. function IsEnabled : Boolean;
  178. end;
  179. {$IFNDEF FPC}
  180. TTaskProc = reference to procedure(task : ITask);
  181. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  182. {$ELSE}
  183. TTaskProc = procedure(task : ITask) of object;
  184. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  185. {$ENDIF}
  186. IWorkTask = interface(ITask)
  187. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  188. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  189. procedure Run;
  190. end;
  191. IScheduledTask = interface(ITask)
  192. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  193. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  194. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  195. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  196. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  197. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  198. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  199. function CheckSchedule : Boolean;
  200. procedure DoExpire;
  201. function GetTaskName : string;
  202. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  203. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  204. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  205. function StartOnDayChange : IScheduledTask;
  206. function StartNow : IScheduledTask;
  207. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  208. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  209. procedure RunOnce;
  210. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  211. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  212. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  213. procedure RepeatEveryDay;
  214. procedure RepeatEveryWeek;
  215. function IsFinished : Boolean;
  216. procedure Cancel;
  217. property Name : string read GetTaskName;
  218. end;
  219. TTask = class(TInterfacedObject,ITask)
  220. private
  221. fIdTask : Int64;
  222. fNumWorker : Integer;
  223. fParamArray : TParamArray;
  224. fExecuteProc : TTaskProc;
  225. fExceptProc : TTaskExceptionProc;
  226. fTerminateProc : TTaskProc;
  227. fExpiredProc : TTaskProc;
  228. fTaskStatus : TWorkTaskStatus;
  229. fOwnedParams : Boolean;
  230. fEnabled : Boolean;
  231. fExecuteWithSync : Boolean;
  232. fExceptionWithSync : Boolean;
  233. fTerminateWithSync : Boolean;
  234. function GetParam(aIndex : Integer) : TFlexValue;
  235. procedure DoExecute;
  236. procedure DoException(aException : Exception);
  237. procedure DoTerminate;
  238. function GetNumWorker : Integer;
  239. procedure SetNumWorker(Value : Integer);
  240. function GetIdTask : Int64;
  241. procedure SetIdTask(Value : Int64);
  242. protected
  243. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  244. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  245. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  246. public
  247. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  248. destructor Destroy; override;
  249. property IdTask : Int64 read GetIdTask;
  250. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  251. function IsEnabled : Boolean;
  252. function TaskStatus : TWorkTaskStatus;
  253. end;
  254. TWorkTask = class(TTask,IWorkTask)
  255. public
  256. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  257. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  258. procedure Run; virtual;
  259. end;
  260. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  261. TScheduledTask = class(TTask,IScheduledTask)
  262. private
  263. fName : string;
  264. fExecutionTimes : Integer;
  265. fScheduleMode : TScheduleMode;
  266. fTimeInterval : Integer;
  267. fTimeMeasure : TTimeMeasure;
  268. fStartDate : TDateTime;
  269. fLastExecution : TDateTime;
  270. fNextExecution : TDateTime;
  271. fExpirationDate : TDateTime;
  272. fExpirationTimes : Integer;
  273. fFinished : Boolean;
  274. fExpireWithSync: Boolean;
  275. procedure ClearSchedule;
  276. function CheckSchedule : Boolean;
  277. procedure DoExpire;
  278. function GetTaskName : string;
  279. protected
  280. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  281. public
  282. property Name : string read fName write fName;
  283. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  284. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  285. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  286. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  287. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  288. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  289. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  290. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  291. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  292. function StartOnDayChange : IScheduledTask;
  293. function StartNow : IScheduledTask;
  294. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  295. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  296. procedure RunOnce;
  297. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  298. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  299. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  300. procedure RepeatEveryDay;
  301. procedure RepeatEveryWeek;
  302. function IsFinished : Boolean;
  303. procedure Cancel;
  304. end;
  305. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  306. TWorker = class(TThread)
  307. private
  308. fNumWorker : Integer;
  309. fCurrentIdTask : Integer;
  310. fStatus : TWorkerStatus;
  311. fTaskQueue : TTaskQueue;
  312. procedure ExecuteTask;
  313. procedure TerminateTask;
  314. protected
  315. fCurrentTask : ITask;
  316. public
  317. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  318. property NumWorker : Integer read fNumWorker;
  319. property Status : TWorkerStatus read fStatus;
  320. procedure Execute; override;
  321. end;
  322. TScheduledWorker = class(TWorker)
  323. private
  324. procedure ExpireTask;
  325. public
  326. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  327. procedure Execute; override;
  328. end;
  329. TWorkerPool = TObjectList<TWorker>;
  330. TBackgroundTasks = class
  331. private
  332. fMaxQueue : Integer;
  333. fWorkerPool : TWorkerPool;
  334. fConcurrentWorkers : Integer;
  335. fInsertTimeout : Cardinal;
  336. fExtractTimeout : Cardinal;
  337. fTaskQueue : TTaskQueue;
  338. fNumPushedTasks : Int64;
  339. function GetTaskQueue : Cardinal;
  340. public
  341. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  342. destructor Destroy; override;
  343. property MaxQueue : Integer read fMaxQueue;
  344. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  345. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  346. property TaskQueued : Cardinal read GetTaskQueue;
  347. property NumPushedTasks : Int64 read fNumPushedTasks;
  348. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  349. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  350. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  351. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  352. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  353. procedure Start;
  354. procedure CancelAll;
  355. end;
  356. TScheduledTaskList = TList<IScheduledTask>;
  357. TScheduler = class(TThread)
  358. private
  359. fListLock : TCriticalSection;
  360. fCondVar : TSimpleEvent;
  361. fTaskList : TScheduledTaskList;
  362. fRemoveTaskAfterExpiration : Boolean;
  363. public
  364. constructor Create(aTaskList : TScheduledTaskList);
  365. destructor Destroy; override;
  366. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  367. procedure Execute; override;
  368. function Add(aTask : TScheduledTask) : Integer;
  369. function Get(aIdTask : Int64) : IScheduledTask; overload;
  370. function Get(const aTaskName : string) : IScheduledTask; overload;
  371. end;
  372. TScheduledTasks = class
  373. private
  374. fTaskList : TScheduledTaskList;
  375. fScheduler : TScheduler;
  376. fNumPushedTasks : Int64;
  377. fRemoveTaskAfterExpiration : Boolean;
  378. fIsStarted : Boolean;
  379. public
  380. constructor Create;
  381. destructor Destroy; override;
  382. property NumPushedTasks : Int64 read fNumPushedTasks;
  383. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  384. property IsStarted : Boolean read fIsStarted;
  385. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  386. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  387. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  388. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  389. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  390. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  391. procedure Start;
  392. procedure Stop;
  393. end;
  394. implementation
  395. { TThreadedQueueCS<T> }
  396. procedure TThreadedQueueCS<T>.Clear;
  397. var
  398. obj : T;
  399. begin
  400. FQueueLock.Enter;
  401. try
  402. for obj in FQueue do
  403. begin
  404. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  405. end;
  406. SetLength(FQueue,0);
  407. finally
  408. FQueueLock.Leave;
  409. end;
  410. {$IFDEF FPC}
  411. FQueueCondVar.SetEvent;
  412. {$ELSE}
  413. FQueueCondVar.ReleaseAll;
  414. {$ENDIF}
  415. end;
  416. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  417. begin
  418. inherited Create;
  419. SetLength(FQueue, AQueueDepth);
  420. FQueueLock := TCriticalSection.Create;
  421. {$IFDEF FPC}
  422. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  423. {$ELSE}
  424. FQueueCondVar := TConditionVariableCS.Create;
  425. {$ENDIF}
  426. FPushTimeout := PushTimeout;
  427. FPopTimeout := PopTimeout;
  428. end;
  429. destructor TThreadedQueueCS<T>.Destroy;
  430. begin
  431. DoShutDown;
  432. FQueueLock.Free;
  433. FQueueCondVar.Free;
  434. inherited;
  435. end;
  436. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  437. begin
  438. FQueueLock.Enter;
  439. try
  440. SetLength(FQueue, Length(FQueue) + ADelta);
  441. finally
  442. FQueueLock.Leave;
  443. end;
  444. {$IFDEF FPC}
  445. FQueueCondVar.SetEvent;
  446. {$ELSE}
  447. FQueueCondVar.ReleaseAll;
  448. {$ENDIF}
  449. end;
  450. function TThreadedQueueCS<T>.PopItem: T;
  451. var
  452. LQueueSize: Integer;
  453. begin
  454. PopItem(LQueueSize, Result);
  455. end;
  456. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  457. begin
  458. AItem := Default(T);
  459. FQueueLock.Enter;
  460. try
  461. Result := wrSignaled;
  462. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  463. begin
  464. {$IFDEF FPC}
  465. Result := FQueueCondVar.WaitFor(FPopTimeout);
  466. {$ELSE}
  467. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  468. {$ENDIF}
  469. end;
  470. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  471. AItem := FQueue[FQueueOffset];
  472. FQueue[FQueueOffset] := Default(T);
  473. if FQueueSize = Length(FQueue) then
  474. begin
  475. {$IFDEF FPC}
  476. FQueueCondVar.SetEvent;
  477. {$ELSE}
  478. FQueueCondVar.ReleaseAll;
  479. {$ENDIF}
  480. end;
  481. Dec(FQueueSize);
  482. Inc(FQueueOffset);
  483. Inc(FTotalItemsPopped);
  484. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  485. finally
  486. AQueueSize := FQueueSize;
  487. FQueueLock.Leave;
  488. end;
  489. end;
  490. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  491. var
  492. LQueueSize: Integer;
  493. begin
  494. Result := PopItem(LQueueSize, AItem);
  495. end;
  496. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  497. begin
  498. PopItem(AQueueSize, Result);
  499. end;
  500. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  501. var
  502. LQueueSize: Integer;
  503. begin
  504. Result := PushItem(AItem, LQueueSize);
  505. end;
  506. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  507. begin
  508. FQueueLock.Enter;
  509. try
  510. Result := wrSignaled;
  511. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  512. begin
  513. {$IFDEF FPC}
  514. Result := FQueueCondVar.WaitFor(FPushTimeout);
  515. {$ELSE}
  516. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  517. {$ENDIF}
  518. end;
  519. if FShutDown or (Result <> wrSignaled) then Exit;
  520. if FQueueSize = 0 then
  521. begin
  522. {$IFDEF FPC}
  523. FQueueCondVar.SetEvent;
  524. {$ELSE}
  525. FQueueCondVar.ReleaseAll;
  526. {$ENDIF}
  527. end;
  528. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  529. Inc(FQueueSize);
  530. Inc(FTotalItemsPushed);
  531. finally
  532. AQueueSize := FQueueSize;
  533. FQueueLock.Leave;
  534. end;
  535. end;
  536. procedure TThreadedQueueCS<T>.DoShutDown;
  537. begin
  538. FShutDown := True;
  539. {$IFDEF FPC}
  540. FQueueCondVar.SetEvent;
  541. {$ELSE}
  542. FQueueCondVar.ReleaseAll;
  543. {$ENDIF}
  544. end;
  545. { TThreadedQueueList<T> }
  546. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  547. begin
  548. inherited Create;
  549. fQueue := TQueue<T>.Create;
  550. fQueue.Capacity := AQueueDepth;
  551. fQueueSize := 0;
  552. fQueueLock := TCriticalSection.Create;
  553. {$IFDEF FPC}
  554. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  555. {$ELSE}
  556. fQueueCondVar := TConditionVariableCS.Create;
  557. {$ENDIF}
  558. fPushTimeout := PushTimeout;
  559. fPopTimeout := PopTimeout;
  560. end;
  561. destructor TThreadedQueueList<T>.Destroy;
  562. begin
  563. DoShutDown;
  564. fQueueLock.Free;
  565. fQueueCondVar.Free;
  566. fQueue.Free;
  567. inherited;
  568. end;
  569. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  570. begin
  571. fQueueLock.Enter;
  572. try
  573. fQueue.Capacity := fQueue.Capacity + ADelta;
  574. finally
  575. fQueueLock.Leave;
  576. end;
  577. {$IFDEF FPC}
  578. FQueueCondVar.SetEvent;
  579. {$ELSE}
  580. FQueueCondVar.ReleaseAll;
  581. {$ENDIF}
  582. end;
  583. function TThreadedQueueList<T>.PopItem: T;
  584. var
  585. LQueueSize: Integer;
  586. begin
  587. PopItem(LQueueSize, Result);
  588. end;
  589. {$IFDEF FPC}
  590. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  591. //var
  592. //crono : TChronometer;
  593. begin
  594. AItem := Default(T);
  595. //crono := TChronometer.Create(False);
  596. try
  597. Result := wrSignaled;
  598. //writeln('popitem');
  599. //crono.Start;
  600. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  601. begin
  602. //crono.Start;
  603. Result := FQueueCondVar.WaitFor(FPopTimeout);
  604. //crono.Stop;
  605. //writeln('in: ' + crono.ElapsedTime);
  606. //if result = twaitresult.wrError then result := twaitresult.wrError;
  607. end;
  608. //crono.Stop;
  609. //writeln('out: ' + crono.ElapsedTime);
  610. fQueueLock.Enter;
  611. try
  612. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  613. AItem := fQueue.Extract;
  614. Dec(FQueueSize);
  615. Inc(fTotalItemsPopped);
  616. finally
  617. fQueueLock.Leave;
  618. end;
  619. finally
  620. AQueueSize := fQueueSize;
  621. end;
  622. end;
  623. {$ELSE}
  624. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  625. begin
  626. AItem := Default(T);
  627. fQueueLock.Enter;
  628. try
  629. Result := wrSignaled;
  630. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  631. begin
  632. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  633. end;
  634. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  635. AItem := fQueue.Extract;
  636. if fQueueSize = fQueue.Count then
  637. begin
  638. FQueueCondVar.ReleaseAll;
  639. end;
  640. Dec(FQueueSize);
  641. Inc(fTotalItemsPopped);
  642. finally
  643. AQueueSize := fQueueSize;
  644. fQueueLock.Leave;
  645. end;
  646. end;
  647. {$ENDIF}
  648. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  649. var
  650. LQueueSize: Integer;
  651. begin
  652. Result := PopItem(LQueueSize, AItem);
  653. end;
  654. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  655. begin
  656. PopItem(AQueueSize, Result);
  657. end;
  658. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  659. var
  660. LQueueSize: Integer;
  661. begin
  662. Result := PushItem(AItem, LQueueSize);
  663. end;
  664. {$IFDEF FPC}
  665. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  666. begin
  667. FQueueLock.Enter;
  668. try
  669. Result := wrSignaled;
  670. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  671. //begin
  672. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  673. //end;
  674. if fShutDown or (Result <> wrSignaled) then Exit;
  675. //if fQueueSize = 0 then
  676. //begin
  677. // FQueueCondVar.SetEvent;
  678. //end;
  679. fQueue.Enqueue(AItem);
  680. Inc(FQueueSize);
  681. Inc(fTotalItemsPushed);
  682. finally
  683. AQueueSize := fQueueSize;
  684. FQueueLock.Leave;
  685. //FQueueCondVar.SetEvent;
  686. end;
  687. end;
  688. {$ELSE}
  689. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  690. begin
  691. FQueueLock.Enter;
  692. try
  693. Result := wrSignaled;
  694. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  695. //begin
  696. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  697. //end;
  698. if fShutDown or (Result <> wrSignaled) then Exit;
  699. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  700. fQueue.Enqueue(AItem);
  701. Inc(FQueueSize);
  702. Inc(fTotalItemsPushed);
  703. finally
  704. AQueueSize := fQueueSize;
  705. FQueueLock.Leave;
  706. end;
  707. end;
  708. {$ENDIF}
  709. procedure TThreadedQueueList<T>.DoShutDown;
  710. begin
  711. fShutDown := True;
  712. {$IFDEF FPC}
  713. FQueueCondVar.SetEvent;
  714. {$ELSE}
  715. FQueueCondVar.ReleaseAll;
  716. {$ENDIF}
  717. end;
  718. {$IFNDEF FPC}
  719. { TThreadObjectList<T> }
  720. procedure TThreadObjectList<T>.Add(const Item: T);
  721. begin
  722. LockList;
  723. try
  724. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  725. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  726. finally
  727. UnlockList;
  728. end;
  729. end;
  730. procedure TThreadObjectList<T>.Clear;
  731. begin
  732. LockList;
  733. try
  734. fList.Clear;
  735. finally
  736. UnlockList;
  737. end;
  738. end;
  739. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  740. begin
  741. inherited Create;
  742. fLock := TObject.Create;
  743. fList := TObjectList<T>.Create;
  744. fDuplicates := dupIgnore;
  745. end;
  746. destructor TThreadObjectList<T>.Destroy;
  747. begin
  748. LockList;
  749. try
  750. fList.Free;
  751. inherited Destroy;
  752. finally
  753. UnlockList;
  754. fLock.Free;
  755. end;
  756. end;
  757. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  758. begin
  759. LockList;
  760. try
  761. Result := fList[aIndex];
  762. finally
  763. UnlockList;
  764. end;
  765. end;
  766. function TThreadObjectList<T>.LockList: TObjectList<T>;
  767. begin
  768. System.TMonitor.Enter(fLock);
  769. Result := fList;
  770. end;
  771. procedure TThreadObjectList<T>.Remove(const Item: T);
  772. begin
  773. RemoveItem(Item, TDirection.FromBeginning);
  774. end;
  775. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  776. begin
  777. LockList;
  778. try
  779. fList.RemoveItem(Item, Direction);
  780. finally
  781. UnlockList;
  782. end;
  783. end;
  784. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  785. begin
  786. LockList;
  787. try
  788. fList[aIndex] := aValue;
  789. finally
  790. UnlockList;
  791. end;
  792. end;
  793. procedure TThreadObjectList<T>.UnlockList;
  794. begin
  795. System.TMonitor.Exit(fLock);
  796. end;
  797. {$ENDIF}
  798. { TAnonymousThread }
  799. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  800. begin
  801. fThread := TAdvThread.Create(aProc,aSynchronize);
  802. end;
  803. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  804. begin
  805. Result := TAnonymousThread.Create(aProc,False);
  806. end;
  807. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  808. begin
  809. Result := TAnonymousThread.Create(aProc,True);
  810. end;
  811. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  812. begin
  813. Result := Self;
  814. fThread.OnTerminate(aProc,False);
  815. end;
  816. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  817. begin
  818. Result := Self;
  819. fThread.OnTerminate(aProc,True);
  820. end;
  821. procedure TAnonymousThread.Start;
  822. begin
  823. fThread.Start;
  824. end;
  825. { TTask }
  826. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  827. var
  828. i : Integer;
  829. begin
  830. fTaskStatus := TWorkTaskStatus.wtsPending;
  831. fExecuteWithSync := False;
  832. fTerminateWithSync := False;
  833. fExceptionWithSync := False;
  834. fOwnedParams := aOwnedParams;
  835. SetLength(fParamArray,High(aParamArray)+1);
  836. for i := Low(aParamArray) to High(aParamArray) do
  837. begin
  838. fParamArray[i].Create(aParamArray[i]);
  839. {$IFDEF FPC}
  840. fParamArray[i]._AddRef;
  841. {$ENDIF}
  842. end;
  843. fExecuteProc := aTaskProc;
  844. fEnabled := False;
  845. end;
  846. destructor TTask.Destroy;
  847. var
  848. i : Integer;
  849. begin
  850. //free passed params
  851. if fOwnedParams then
  852. begin
  853. for i := Low(fParamArray) to High(fParamArray) do
  854. begin
  855. {$IFDEF FPC}
  856. fParamArray[i]._Release;
  857. {$ENDIF}
  858. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  859. end;
  860. end;
  861. fParamArray := nil;
  862. inherited;
  863. end;
  864. procedure TTask.DoException(aException : Exception);
  865. begin
  866. fTaskStatus := TWorkTaskStatus.wtsException;
  867. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  868. else raise aException;
  869. end;
  870. procedure TTask.DoExecute;
  871. begin
  872. fTaskStatus := TWorkTaskStatus.wtsRunning;
  873. if Assigned(fExecuteProc) then fExecuteProc(Self);
  874. fTaskStatus := TWorkTaskStatus.wtsDone;
  875. end;
  876. procedure TTask.DoTerminate;
  877. begin
  878. if Assigned(fTerminateProc) then fTerminateProc(Self);
  879. end;
  880. function TTask.GetIdTask: Int64;
  881. begin
  882. Result := fIdTask;
  883. end;
  884. procedure TTask.SetIdTask(Value : Int64);
  885. begin
  886. fIdTask := Value;
  887. end;
  888. function TTask.GetNumWorker: Integer;
  889. begin
  890. Result := fNumWorker;
  891. end;
  892. function TTask.GetParam(aIndex: Integer): TFlexValue;
  893. begin
  894. Result := fParamArray[aIndex];
  895. end;
  896. function TTask.IsEnabled: Boolean;
  897. begin
  898. Result := fEnabled;
  899. end;
  900. procedure TTask.SetNumWorker(Value: Integer);
  901. begin
  902. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  903. fNumWorker := Value;
  904. end;
  905. function TTask.TaskStatus: TWorkTaskStatus;
  906. begin
  907. Result := fTaskStatus;
  908. end;
  909. { TWorkTask }
  910. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  911. begin
  912. fExceptProc := aTaskProc;
  913. Result := Self;
  914. end;
  915. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  916. begin
  917. fTerminateProc := aTaskProc;
  918. Result := Self;
  919. end;
  920. procedure TWorkTask.Run;
  921. begin
  922. fEnabled := True;
  923. end;
  924. { TBackgroundTasks }
  925. procedure TBackgroundTasks.CancelAll;
  926. begin
  927. fTaskQueue.Clear;
  928. end;
  929. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  930. begin
  931. fMaxQueue := aMaxQueue;
  932. fConcurrentWorkers := aConcurrentWorkers;
  933. fInsertTimeout := INFINITE;
  934. fExtractTimeout := 5000;
  935. fNumPushedTasks := 0;
  936. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  937. end;
  938. destructor TBackgroundTasks.Destroy;
  939. begin
  940. if Assigned(fWorkerPool) then fWorkerPool.Free;
  941. if Assigned(fTaskQueue) then fTaskQueue.Free;
  942. inherited;
  943. end;
  944. function TBackgroundTasks.GetTaskQueue: Cardinal;
  945. begin
  946. Result := fTaskQueue.QueueSize;
  947. end;
  948. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  949. begin
  950. Result := AddTask([],False,aTaskProc);
  951. end;
  952. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  953. var
  954. worktask : IWorkTask;
  955. begin
  956. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  957. Inc(fNumPushedTasks);
  958. worktask.SetIdTask(fNumPushedTasks);
  959. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  960. begin
  961. Result := worktask;
  962. end;
  963. end;
  964. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  965. begin
  966. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  967. TTask(Result).ExecuteWithSync := True;
  968. end;
  969. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  970. begin
  971. Result := AddTask_Sync([],False,aTaskProc);
  972. end;
  973. procedure TBackgroundTasks.Start;
  974. var
  975. i : Integer;
  976. worker : TWorker;
  977. begin
  978. //create workers
  979. if fWorkerPool <> nil then fWorkerPool.Free;
  980. fWorkerPool := TWorkerPool.Create(True);
  981. for i := 1 to fConcurrentWorkers do
  982. begin
  983. worker := TWorker.Create(i,fTaskQueue);
  984. fWorkerPool.Add(worker);
  985. worker.Start;
  986. end;
  987. end;
  988. { TWorker }
  989. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  990. begin
  991. inherited Create(True);
  992. fNumWorker := aNumWorker;
  993. fStatus := TWorkerStatus.wsSuspended;
  994. fTaskQueue := aTaskQueue;
  995. FreeOnTerminate := False;
  996. end;
  997. procedure TWorker.ExecuteTask;
  998. begin
  999. fCurrentTask.DoExecute;
  1000. end;
  1001. procedure TWorker.TerminateTask;
  1002. begin
  1003. fCurrentTask.DoTerminate;
  1004. end;
  1005. procedure TWorker.Execute;
  1006. begin
  1007. fStatus := TWorkerStatus.wsIdle;
  1008. while (not Terminated) and (fTaskQueue.QueueSize > 0) do
  1009. begin
  1010. fCurrentTask := fTaskQueue.PopItem;
  1011. if fCurrentTask <> nil then
  1012. try
  1013. fStatus := TWorkerStatus.wsWorking;
  1014. try
  1015. fCurrentIdTask := fCurrentTask.GetIdTask;
  1016. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1017. else fCurrentTask.DoExecute;
  1018. except
  1019. on E : Exception do
  1020. begin
  1021. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1022. else raise Exception.Create(e.Message);
  1023. end;
  1024. end;
  1025. finally
  1026. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1027. else fCurrentTask.DoTerminate;
  1028. fStatus := TWorkerStatus.wsIdle;
  1029. end;
  1030. end;
  1031. fStatus := TWorkerStatus.wsSuspended
  1032. end;
  1033. { TScheduledTasks }
  1034. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1035. begin
  1036. Result := AddTask(aTaskName,[],False,aTaskProc);
  1037. end;
  1038. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1039. var
  1040. scheduletask : TScheduledTask;
  1041. begin
  1042. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1043. scheduletask.Name := aTaskName;
  1044. Inc(fNumPushedTasks);
  1045. scheduletask.SetIdTask(fNumPushedTasks);
  1046. fTaskList.Add(scheduletask);
  1047. Result := scheduletask;
  1048. end;
  1049. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1050. begin
  1051. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1052. TTask(Result).ExecuteWithSync := True;
  1053. end;
  1054. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1055. begin
  1056. Result := AddTask_Sync(aTaskName,aTaskProc);
  1057. end;
  1058. constructor TScheduledTasks.Create;
  1059. begin
  1060. fNumPushedTasks := 0;
  1061. fIsStarted := False;
  1062. fTaskList := TScheduledTaskList.Create;
  1063. end;
  1064. destructor TScheduledTasks.Destroy;
  1065. begin
  1066. if Assigned(fScheduler) then
  1067. begin
  1068. fScheduler.Terminate;
  1069. fScheduler.WaitFor;
  1070. fScheduler.Free;
  1071. end;
  1072. if Assigned(fTaskList) then fTaskList.Free;
  1073. inherited;
  1074. end;
  1075. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1076. begin
  1077. Result := fScheduler.Get(aIdTask);
  1078. end;
  1079. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1080. begin
  1081. if not Assigned(fScheduler) then raise Exception.Create('Scheduler must be started to get a task!');
  1082. Result := fScheduler.Get(aTaskName);
  1083. end;
  1084. procedure TScheduledTasks.Start;
  1085. begin
  1086. if fIsStarted then Exit;
  1087. if not Assigned(fScheduler) then
  1088. begin
  1089. fScheduler := TScheduler.Create(fTaskList);
  1090. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1091. end;
  1092. fScheduler.Start;
  1093. fIsStarted := True;
  1094. end;
  1095. procedure TScheduledTasks.Stop;
  1096. begin
  1097. if Assigned(fScheduler) then fScheduler.Terminate;
  1098. fIsStarted := False;
  1099. end;
  1100. { TScheduledTask }
  1101. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1102. begin
  1103. Result := Self;
  1104. ClearSchedule;
  1105. fScheduleMode := TScheduleMode.smRunOnce;
  1106. fStartDate := aStartDate;
  1107. fNextExecution := aStartDate;
  1108. end;
  1109. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1110. begin
  1111. Result := Self;
  1112. ClearSchedule;
  1113. fScheduleMode := TScheduleMode.smRunOnce;
  1114. fStartDate := IncMinute(Now(),aMinutes);
  1115. fNextExecution := fStartDate;
  1116. end;
  1117. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1118. begin
  1119. Result := Self;
  1120. ClearSchedule;
  1121. fScheduleMode := TScheduleMode.smRunOnce;
  1122. fStartDate := IncSecond(Now(),aSeconds);
  1123. fNextExecution := fStartDate;
  1124. end;
  1125. function TScheduledTask.StartNow: IScheduledTask;
  1126. begin
  1127. Result := Self;
  1128. ClearSchedule;
  1129. fScheduleMode := TScheduleMode.smRunOnce;
  1130. fStartDate := Now();
  1131. fNextExecution := fStartDate;
  1132. end;
  1133. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1134. begin
  1135. Result := Self;
  1136. ClearSchedule;
  1137. fScheduleMode := TScheduleMode.smRunOnce;
  1138. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1139. fNextExecution := fStartDate;
  1140. end;
  1141. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1142. begin
  1143. Result := Self;
  1144. ClearSchedule;
  1145. fScheduleMode := TScheduleMode.smRunOnce;
  1146. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1147. fNextExecution := fStartDate;
  1148. end;
  1149. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1150. begin
  1151. Result := Self;
  1152. ClearSchedule;
  1153. fScheduleMode := TScheduleMode.smRunOnce;
  1154. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1155. fNextExecution := fStartDate;
  1156. end;
  1157. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1158. begin
  1159. if fStartDate = 0.0 then ClearSchedule;
  1160. fScheduleMode := TScheduleMode.smRepeatMode;
  1161. fTimeMeasure := aTimeMeasure;
  1162. fTimeInterval := aInterval;
  1163. if fStartDate = 0.0 then fStartDate := Now();
  1164. fNextExecution := fStartDate;
  1165. fEnabled := True;
  1166. end;
  1167. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1168. begin
  1169. if fStartDate = 0.0 then ClearSchedule;
  1170. fScheduleMode := TScheduleMode.smRepeatMode;
  1171. fTimeMeasure := aTimeMeasure;
  1172. fTimeInterval := aInterval;
  1173. if fStartDate = 0.0 then fStartDate := Now();
  1174. fExpirationDate := aEndTime;
  1175. fNextExecution := fStartDate;
  1176. fEnabled := True;
  1177. end;
  1178. procedure TScheduledTask.RepeatEveryDay;
  1179. begin
  1180. RepeatEvery(1,tmDays);
  1181. end;
  1182. procedure TScheduledTask.RepeatEveryWeek;
  1183. begin
  1184. RepeatEvery(7,tmDays);
  1185. end;
  1186. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1187. begin
  1188. if fStartDate = 0.0 then ClearSchedule;
  1189. fScheduleMode := TScheduleMode.smRepeatMode;
  1190. fTimeMeasure := aTimeMeasure;
  1191. fTimeInterval := aInterval;
  1192. if fStartDate = 0.0 then fStartDate := Now();
  1193. fExpirationTimes := aRepeatTimes;
  1194. fNextExecution := fStartDate;
  1195. fEnabled := True;
  1196. end;
  1197. procedure TScheduledTask.RunOnce;
  1198. begin
  1199. fScheduleMode := TScheduleMode.smRunOnce;
  1200. if fStartDate = 0.0 then fStartDate := Now();
  1201. fNextExecution := fStartDate;
  1202. fEnabled := True;
  1203. end;
  1204. procedure TScheduledTask.Cancel;
  1205. begin
  1206. fFinished := True;
  1207. end;
  1208. function TScheduledTask.CheckSchedule: Boolean;
  1209. begin
  1210. Result := False;
  1211. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1212. if fScheduleMode = TScheduleMode.smRunOnce then
  1213. begin
  1214. //if start date reached
  1215. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1216. begin
  1217. fLastExecution := Now();
  1218. Inc(fExecutionTimes);
  1219. fFinished := True;
  1220. Result := True;
  1221. end;
  1222. end
  1223. else
  1224. begin
  1225. //if next execution reached
  1226. if Now() >= fNextExecution then
  1227. begin
  1228. //check expiration limits
  1229. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1230. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1231. begin
  1232. fFinished := True;
  1233. Exit;
  1234. end;
  1235. //calculate next execution
  1236. case fTimeMeasure of
  1237. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1238. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1239. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1240. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1241. end;
  1242. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1243. else
  1244. begin
  1245. fLastExecution := Now();
  1246. Inc(fExecutionTimes);
  1247. Result := True;
  1248. end;
  1249. end;
  1250. end;
  1251. end;
  1252. procedure TScheduledTask.ClearSchedule;
  1253. begin
  1254. inherited;
  1255. fFinished := False;
  1256. fStartDate := 0.0;
  1257. fLastExecution := 0.0;
  1258. fNextExecution := 0.0;
  1259. fExpirationDate := 0.0;
  1260. fScheduleMode := TScheduleMode.smRunOnce;
  1261. fTimeMeasure := TTimeMeasure.tmSeconds;
  1262. fTimeInterval := 0;
  1263. end;
  1264. procedure TScheduledTask.DoExpire;
  1265. begin
  1266. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1267. fEnabled := False;
  1268. end;
  1269. function TScheduledTask.GetTaskName: string;
  1270. begin
  1271. Result := fName;
  1272. end;
  1273. function TScheduledTask.IsFinished: Boolean;
  1274. begin
  1275. Result := fFinished;
  1276. end;
  1277. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1278. begin
  1279. fExceptProc := aTaskProc;
  1280. Result := Self;
  1281. end;
  1282. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1283. begin
  1284. Result := OnException(aTaskProc);
  1285. TTask(Result).ExceptionWithSync := True;
  1286. end;
  1287. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1288. begin
  1289. fExpiredProc := aTaskProc;
  1290. Result := Self;
  1291. end;
  1292. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1293. begin
  1294. Result := OnExpired(aTaskProc);
  1295. TScheduledTask(Result).ExpireWithSync := True;
  1296. end;
  1297. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1298. begin
  1299. fTerminateProc := aTaskProc;
  1300. Result := Self;
  1301. end;
  1302. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1303. begin
  1304. Result := OnTerminated(aTaskProc);
  1305. TTask(Result).TerminateWithSync := True;
  1306. end;
  1307. { TScheduledWorker }
  1308. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1309. begin
  1310. inherited Create(aNumWorker,nil);
  1311. {$IFNDEF DELPHILINUX}
  1312. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1313. {$ENDIF}
  1314. FreeOnTerminate := True;
  1315. fCurrentTask := aScheduledTask;
  1316. end;
  1317. procedure TScheduledWorker.Execute;
  1318. begin
  1319. fStatus := TWorkerStatus.wsIdle;
  1320. if Assigned(fCurrentTask) then
  1321. begin
  1322. try
  1323. fStatus := TWorkerStatus.wsWorking;
  1324. try
  1325. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1326. else fCurrentTask.DoExecute;
  1327. fStatus := TWorkerStatus.wsIdle;
  1328. except
  1329. on E : Exception do
  1330. begin
  1331. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1332. else raise Exception.Create(e.Message);
  1333. end;
  1334. end;
  1335. finally
  1336. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1337. else fCurrentTask.DoTerminate;
  1338. //check if expired
  1339. if (fCurrentTask as IScheduledTask).IsFinished then
  1340. begin
  1341. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1342. else (fCurrentTask as IScheduledTask).DoExpire;
  1343. end;
  1344. end;
  1345. end;
  1346. fCurrentTask := nil;
  1347. fStatus := TWorkerStatus.wsSuspended;
  1348. end;
  1349. procedure TScheduledWorker.ExpireTask;
  1350. begin
  1351. (fCurrentTask as IScheduledTask).DoExpire;
  1352. end;
  1353. { TScheduler }
  1354. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1355. begin
  1356. inherited Create(True);
  1357. FreeOnTerminate := False;
  1358. fListLock := TCriticalSection.Create;
  1359. fRemoveTaskAfterExpiration := False;
  1360. fTaskList := aTaskList;
  1361. {$IFDEF FPC}
  1362. fCondVar := TSimpleEvent.Create;
  1363. {$ELSE}
  1364. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1365. {$ENDIF}
  1366. end;
  1367. destructor TScheduler.Destroy;
  1368. begin
  1369. fCondVar.SetEvent;
  1370. fCondVar.Free;
  1371. fTaskList := nil;
  1372. fListLock.Free;
  1373. inherited;
  1374. end;
  1375. procedure TScheduler.Execute;
  1376. var
  1377. task : IScheduledTask;
  1378. worker : TScheduledWorker;
  1379. numworker : Int64;
  1380. begin
  1381. numworker := 0;
  1382. while not Terminated do
  1383. begin
  1384. fListLock.Enter;
  1385. try
  1386. for task in fTaskList do
  1387. begin
  1388. if (task.IsEnabled) and (not task.IsFinished) then
  1389. begin
  1390. if task.CheckSchedule then
  1391. begin
  1392. Inc(numworker);
  1393. worker := TScheduledWorker.Create(numworker,task);
  1394. worker.Start;
  1395. end;
  1396. end
  1397. else
  1398. begin
  1399. if task.IsEnabled then
  1400. begin
  1401. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1402. // else task.DoExpire;
  1403. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1404. end;
  1405. end;
  1406. end;
  1407. task := nil;
  1408. finally
  1409. fListLock.Leave;
  1410. end;
  1411. fCondVar.WaitFor(250);
  1412. end;
  1413. end;
  1414. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1415. begin
  1416. Result := fTaskList.Add(aTask);
  1417. end;
  1418. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1419. var
  1420. task : IScheduledTask;
  1421. begin
  1422. fListLock.Enter;
  1423. try
  1424. for task in fTaskList do
  1425. begin
  1426. if task.IdTask = aIdTask then Exit(task);
  1427. end;
  1428. finally
  1429. fListLock.Leave;
  1430. end;
  1431. end;
  1432. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1433. var
  1434. task : IScheduledTask;
  1435. begin
  1436. fListLock.Enter;
  1437. try
  1438. for task in fTaskList do
  1439. begin
  1440. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1441. end;
  1442. finally
  1443. fListLock.Leave;
  1444. end;
  1445. end;
  1446. { TAdvThread }
  1447. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1448. begin
  1449. inherited Create(True);
  1450. FreeOnTerminate := True;
  1451. fExecuteWithSync := aSynchronize;
  1452. fExecuteProc := aProc;
  1453. end;
  1454. procedure TAdvThread.DoExecute;
  1455. begin
  1456. if Assigned(fExecuteProc) then fExecuteProc;
  1457. end;
  1458. procedure TAdvThread.CallToTerminate;
  1459. begin
  1460. if Assigned(fTerminateProc) then fTerminateProc;
  1461. end;
  1462. procedure TAdvThread.DoTerminate;
  1463. begin
  1464. if fTerminateWithSync then Synchronize(CallToTerminate)
  1465. else CallToTerminate;
  1466. end;
  1467. procedure TAdvThread.Execute;
  1468. begin
  1469. if fExecuteWithSync then Synchronize(DoExecute)
  1470. else DoExecute;
  1471. end;
  1472. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1473. begin
  1474. fTerminateWithSync := aSynchronize;
  1475. fTerminateProc := aProc;
  1476. end;
  1477. end.