Quick.Threads.pas 44 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 24/01/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. Quick.Commons,
  31. //Quick.Chrono,
  32. Quick.Value,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFDEF FPC}
  126. TProc = procedure of object;
  127. {$ENDIF}
  128. TAdvThread = class(TThread)
  129. private
  130. fExecuteProc : TProc;
  131. fTerminateProc : TProc;
  132. fExecuteWithSync : Boolean;
  133. fTerminateWithSync : Boolean;
  134. procedure DoExecute;
  135. procedure CallToTerminate;
  136. protected
  137. procedure DoTerminate; override;
  138. public
  139. constructor Create(aProc : TProc; aSynchronize : Boolean);
  140. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  141. procedure Execute; override;
  142. end;
  143. IAnonymousThread = interface
  144. procedure Start;
  145. function OnTerminate(aProc : TProc) : IAnonymousThread;
  146. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  147. end;
  148. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  149. private
  150. fThread : TAdvThread;
  151. constructor Create(aProc : TProc; aSynchronize : Boolean);
  152. public
  153. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  154. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  155. procedure Start;
  156. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  157. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  158. end;
  159. TParamArray = array of TFlexValue;
  160. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  161. TScheduleMode = (smRunOnce, smRepeatMode);
  162. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
  163. ITask = interface
  164. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  165. function GetParam(aIndex : Integer) : TFlexValue;
  166. function TaskStatus : TWorkTaskStatus;
  167. function GetNumWorker : Integer;
  168. procedure SetNumWorker(Value : Integer);
  169. function GetIdTask : Int64;
  170. procedure SetIdTask(Value : Int64);
  171. procedure DoExecute;
  172. procedure DoException(aException : Exception);
  173. procedure DoTerminate;
  174. property Param[index : Integer] : TFlexValue read GetParam;
  175. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  176. property IdTask : Int64 read GetIdTask;
  177. function IsEnabled : Boolean;
  178. end;
  179. {$IFNDEF FPC}
  180. TTaskProc = reference to procedure(task : ITask);
  181. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  182. {$ELSE}
  183. TTaskProc = procedure(task : ITask) of object;
  184. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  185. {$ENDIF}
  186. IWorkTask = interface(ITask)
  187. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  188. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  189. procedure Run;
  190. end;
  191. IScheduledTask = interface(ITask)
  192. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  193. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  194. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  195. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  196. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  197. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  198. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  199. function CheckSchedule : Boolean;
  200. procedure DoExpire;
  201. function GetTaskName : string;
  202. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  203. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  204. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  205. function StartOnDayChange : IScheduledTask;
  206. function StartNow : IScheduledTask;
  207. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  208. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  209. procedure RunOnce;
  210. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  211. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  212. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  213. procedure RepeatEveryDay;
  214. procedure RepeatEveryWeek;
  215. function IsFinished : Boolean;
  216. procedure Cancel;
  217. property Name : string read GetTaskName;
  218. end;
  219. TTask = class(TInterfacedObject,ITask)
  220. private
  221. fIdTask : Int64;
  222. fNumWorker : Integer;
  223. fParamArray : TParamArray;
  224. fExecuteProc : TTaskProc;
  225. fExceptProc : TTaskExceptionProc;
  226. fTerminateProc : TTaskProc;
  227. fExpiredProc : TTaskProc;
  228. fTaskStatus : TWorkTaskStatus;
  229. fOwnedParams : Boolean;
  230. fEnabled : Boolean;
  231. fExecuteWithSync : Boolean;
  232. fExceptionWithSync : Boolean;
  233. fTerminateWithSync : Boolean;
  234. function GetParam(aIndex : Integer) : TFlexValue;
  235. procedure DoExecute;
  236. procedure DoException(aException : Exception);
  237. procedure DoTerminate;
  238. function GetNumWorker : Integer;
  239. procedure SetNumWorker(Value : Integer);
  240. function GetIdTask : Int64;
  241. procedure SetIdTask(Value : Int64);
  242. protected
  243. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  244. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  245. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  246. public
  247. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  248. destructor Destroy; override;
  249. property IdTask : Int64 read GetIdTask;
  250. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  251. function IsEnabled : Boolean;
  252. function TaskStatus : TWorkTaskStatus;
  253. end;
  254. TWorkTask = class(TTask,IWorkTask)
  255. public
  256. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  257. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  258. procedure Run; virtual;
  259. end;
  260. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  261. TScheduledTask = class(TTask,IScheduledTask)
  262. private
  263. fName : string;
  264. fExecutionTimes : Integer;
  265. fScheduleMode : TScheduleMode;
  266. fTimeInterval : Integer;
  267. fTimeMeasure : TTimeMeasure;
  268. fStartDate : TDateTime;
  269. fLastExecution : TDateTime;
  270. fNextExecution : TDateTime;
  271. fExpirationDate : TDateTime;
  272. fExpirationTimes : Integer;
  273. fFinished : Boolean;
  274. fExpireWithSync: Boolean;
  275. procedure ClearSchedule;
  276. function CheckSchedule : Boolean;
  277. procedure DoExpire;
  278. function GetTaskName : string;
  279. protected
  280. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  281. public
  282. property Name : string read fName write fName;
  283. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  284. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  285. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  286. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  287. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  288. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  289. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  290. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  291. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  292. function StartOnDayChange : IScheduledTask;
  293. function StartNow : IScheduledTask;
  294. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  295. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  296. procedure RunOnce;
  297. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  298. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  299. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  300. procedure RepeatEveryDay;
  301. procedure RepeatEveryWeek;
  302. function IsFinished : Boolean;
  303. procedure Cancel;
  304. end;
  305. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  306. TWorker = class(TThread)
  307. private
  308. fNumWorker : Integer;
  309. fCurrentIdTask : Integer;
  310. fStatus : TWorkerStatus;
  311. fTaskQueue : TTaskQueue;
  312. procedure ExecuteTask;
  313. procedure TerminateTask;
  314. protected
  315. fCurrentTask : ITask;
  316. public
  317. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  318. property NumWorker : Integer read fNumWorker;
  319. property Status : TWorkerStatus read fStatus;
  320. procedure Execute; override;
  321. end;
  322. TScheduledWorker = class(TWorker)
  323. private
  324. procedure ExpireTask;
  325. public
  326. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  327. procedure Execute; override;
  328. end;
  329. TWorkerPool = TObjectList<TWorker>;
  330. TBackgroundTasks = class
  331. private
  332. fMaxQueue : Integer;
  333. fWorkerPool : TWorkerPool;
  334. fConcurrentWorkers : Integer;
  335. fInsertTimeout : Cardinal;
  336. fExtractTimeout : Cardinal;
  337. fTaskQueue : TTaskQueue;
  338. fNumPushedTasks : Int64;
  339. function GetTaskQueue : Cardinal;
  340. public
  341. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  342. destructor Destroy; override;
  343. property MaxQueue : Integer read fMaxQueue;
  344. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  345. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  346. property TaskQueued : Cardinal read GetTaskQueue;
  347. property NumPushedTasks : Int64 read fNumPushedTasks;
  348. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  349. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  350. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  351. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  352. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  353. procedure Start;
  354. procedure CancelAll;
  355. end;
  356. TScheduledTaskList = TList<IScheduledTask>;
  357. TScheduler = class(TThread)
  358. private
  359. fListLock : TCriticalSection;
  360. fCondVar : TSimpleEvent;
  361. fTaskList : TScheduledTaskList;
  362. fRemoveTaskAfterExpiration : Boolean;
  363. procedure ExpireTask;
  364. public
  365. constructor Create(aTaskList : TScheduledTaskList);
  366. destructor Destroy; override;
  367. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  368. procedure Execute; override;
  369. function Add(aTask : TScheduledTask) : Integer;
  370. function Get(aIdTask : Int64) : IScheduledTask; overload;
  371. function Get(const aTaskName : string) : IScheduledTask; overload;
  372. end;
  373. TScheduledTasks = class
  374. private
  375. fTaskList : TScheduledTaskList;
  376. fScheduler : TScheduler;
  377. fNumPushedTasks : Int64;
  378. fRemoveTaskAfterExpiration : Boolean;
  379. fIsStarted : Boolean;
  380. public
  381. constructor Create;
  382. destructor Destroy; override;
  383. property NumPushedTasks : Int64 read fNumPushedTasks;
  384. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  385. property IsStarted : Boolean read fIsStarted;
  386. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  387. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  388. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  389. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  390. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  391. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  392. procedure Start;
  393. procedure Stop;
  394. end;
  395. implementation
  396. { TThreadedQueueCS<T> }
  397. procedure TThreadedQueueCS<T>.Clear;
  398. var
  399. obj : T;
  400. begin
  401. FQueueLock.Enter;
  402. try
  403. for obj in FQueue do
  404. begin
  405. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  406. end;
  407. SetLength(FQueue,0);
  408. finally
  409. FQueueLock.Leave;
  410. end;
  411. {$IFDEF FPC}
  412. FQueueCondVar.SetEvent;
  413. {$ELSE}
  414. FQueueCondVar.ReleaseAll;
  415. {$ENDIF}
  416. end;
  417. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  418. begin
  419. inherited Create;
  420. SetLength(FQueue, AQueueDepth);
  421. FQueueLock := TCriticalSection.Create;
  422. {$IFDEF FPC}
  423. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  424. {$ELSE}
  425. FQueueCondVar := TConditionVariableCS.Create;
  426. {$ENDIF}
  427. FPushTimeout := PushTimeout;
  428. FPopTimeout := PopTimeout;
  429. end;
  430. destructor TThreadedQueueCS<T>.Destroy;
  431. begin
  432. DoShutDown;
  433. FQueueLock.Free;
  434. FQueueCondVar.Free;
  435. inherited;
  436. end;
  437. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  438. begin
  439. FQueueLock.Enter;
  440. try
  441. SetLength(FQueue, Length(FQueue) + ADelta);
  442. finally
  443. FQueueLock.Leave;
  444. end;
  445. {$IFDEF FPC}
  446. FQueueCondVar.SetEvent;
  447. {$ELSE}
  448. FQueueCondVar.ReleaseAll;
  449. {$ENDIF}
  450. end;
  451. function TThreadedQueueCS<T>.PopItem: T;
  452. var
  453. LQueueSize: Integer;
  454. begin
  455. PopItem(LQueueSize, Result);
  456. end;
  457. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  458. begin
  459. AItem := Default(T);
  460. FQueueLock.Enter;
  461. try
  462. Result := wrSignaled;
  463. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  464. begin
  465. {$IFDEF FPC}
  466. Result := FQueueCondVar.WaitFor(FPopTimeout);
  467. {$ELSE}
  468. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  469. {$ENDIF}
  470. end;
  471. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  472. AItem := FQueue[FQueueOffset];
  473. FQueue[FQueueOffset] := Default(T);
  474. if FQueueSize = Length(FQueue) then
  475. begin
  476. {$IFDEF FPC}
  477. FQueueCondVar.SetEvent;
  478. {$ELSE}
  479. FQueueCondVar.ReleaseAll;
  480. {$ENDIF}
  481. end;
  482. Dec(FQueueSize);
  483. Inc(FQueueOffset);
  484. Inc(FTotalItemsPopped);
  485. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  486. finally
  487. AQueueSize := FQueueSize;
  488. FQueueLock.Leave;
  489. end;
  490. end;
  491. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  492. var
  493. LQueueSize: Integer;
  494. begin
  495. Result := PopItem(LQueueSize, AItem);
  496. end;
  497. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  498. begin
  499. PopItem(AQueueSize, Result);
  500. end;
  501. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  502. var
  503. LQueueSize: Integer;
  504. begin
  505. Result := PushItem(AItem, LQueueSize);
  506. end;
  507. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  508. begin
  509. FQueueLock.Enter;
  510. try
  511. Result := wrSignaled;
  512. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  513. begin
  514. {$IFDEF FPC}
  515. Result := FQueueCondVar.WaitFor(FPushTimeout);
  516. {$ELSE}
  517. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  518. {$ENDIF}
  519. end;
  520. if FShutDown or (Result <> wrSignaled) then Exit;
  521. if FQueueSize = 0 then
  522. begin
  523. {$IFDEF FPC}
  524. FQueueCondVar.SetEvent;
  525. {$ELSE}
  526. FQueueCondVar.ReleaseAll;
  527. {$ENDIF}
  528. end;
  529. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  530. Inc(FQueueSize);
  531. Inc(FTotalItemsPushed);
  532. finally
  533. AQueueSize := FQueueSize;
  534. FQueueLock.Leave;
  535. end;
  536. end;
  537. procedure TThreadedQueueCS<T>.DoShutDown;
  538. begin
  539. FShutDown := True;
  540. {$IFDEF FPC}
  541. FQueueCondVar.SetEvent;
  542. {$ELSE}
  543. FQueueCondVar.ReleaseAll;
  544. {$ENDIF}
  545. end;
  546. { TThreadedQueueList<T> }
  547. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  548. begin
  549. inherited Create;
  550. fQueue := TQueue<T>.Create;
  551. fQueue.Capacity := AQueueDepth;
  552. fQueueSize := 0;
  553. fQueueLock := TCriticalSection.Create;
  554. {$IFDEF FPC}
  555. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  556. {$ELSE}
  557. fQueueCondVar := TConditionVariableCS.Create;
  558. {$ENDIF}
  559. fPushTimeout := PushTimeout;
  560. fPopTimeout := PopTimeout;
  561. end;
  562. destructor TThreadedQueueList<T>.Destroy;
  563. begin
  564. DoShutDown;
  565. fQueueLock.Free;
  566. fQueueCondVar.Free;
  567. fQueue.Free;
  568. inherited;
  569. end;
  570. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  571. begin
  572. fQueueLock.Enter;
  573. try
  574. fQueue.Capacity := fQueue.Capacity + ADelta;
  575. finally
  576. fQueueLock.Leave;
  577. end;
  578. {$IFDEF FPC}
  579. FQueueCondVar.SetEvent;
  580. {$ELSE}
  581. FQueueCondVar.ReleaseAll;
  582. {$ENDIF}
  583. end;
  584. function TThreadedQueueList<T>.PopItem: T;
  585. var
  586. LQueueSize: Integer;
  587. begin
  588. PopItem(LQueueSize, Result);
  589. end;
  590. {$IFDEF FPC}
  591. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  592. //var
  593. //crono : TChronometer;
  594. begin
  595. AItem := Default(T);
  596. //crono := TChronometer.Create(False);
  597. try
  598. Result := wrSignaled;
  599. //writeln('popitem');
  600. //crono.Start;
  601. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  602. begin
  603. //crono.Start;
  604. Result := FQueueCondVar.WaitFor(FPopTimeout);
  605. //crono.Stop;
  606. //writeln('in: ' + crono.ElapsedTime);
  607. //if result = twaitresult.wrError then result := twaitresult.wrError;
  608. end;
  609. //crono.Stop;
  610. //writeln('out: ' + crono.ElapsedTime);
  611. fQueueLock.Enter;
  612. try
  613. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  614. AItem := fQueue.Extract;
  615. Dec(FQueueSize);
  616. Inc(fTotalItemsPopped);
  617. finally
  618. fQueueLock.Leave;
  619. end;
  620. finally
  621. AQueueSize := fQueueSize;
  622. end;
  623. end;
  624. {$ELSE}
  625. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  626. begin
  627. AItem := Default(T);
  628. fQueueLock.Enter;
  629. try
  630. Result := wrSignaled;
  631. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  632. begin
  633. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  634. end;
  635. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  636. AItem := fQueue.Extract;
  637. if fQueueSize = fQueue.Count then
  638. begin
  639. FQueueCondVar.ReleaseAll;
  640. end;
  641. Dec(FQueueSize);
  642. Inc(fTotalItemsPopped);
  643. finally
  644. AQueueSize := fQueueSize;
  645. fQueueLock.Leave;
  646. end;
  647. end;
  648. {$ENDIF}
  649. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  650. var
  651. LQueueSize: Integer;
  652. begin
  653. Result := PopItem(LQueueSize, AItem);
  654. end;
  655. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  656. begin
  657. PopItem(AQueueSize, Result);
  658. end;
  659. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  660. var
  661. LQueueSize: Integer;
  662. begin
  663. Result := PushItem(AItem, LQueueSize);
  664. end;
  665. {$IFDEF FPC}
  666. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  667. begin
  668. FQueueLock.Enter;
  669. try
  670. Result := wrSignaled;
  671. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  672. //begin
  673. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  674. //end;
  675. if fShutDown or (Result <> wrSignaled) then Exit;
  676. //if fQueueSize = 0 then
  677. //begin
  678. // FQueueCondVar.SetEvent;
  679. //end;
  680. fQueue.Enqueue(AItem);
  681. Inc(FQueueSize);
  682. Inc(fTotalItemsPushed);
  683. finally
  684. AQueueSize := fQueueSize;
  685. FQueueLock.Leave;
  686. //FQueueCondVar.SetEvent;
  687. end;
  688. end;
  689. {$ELSE}
  690. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  691. begin
  692. FQueueLock.Enter;
  693. try
  694. Result := wrSignaled;
  695. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  696. //begin
  697. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  698. //end;
  699. if fShutDown or (Result <> wrSignaled) then Exit;
  700. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  701. fQueue.Enqueue(AItem);
  702. Inc(FQueueSize);
  703. Inc(fTotalItemsPushed);
  704. finally
  705. AQueueSize := fQueueSize;
  706. FQueueLock.Leave;
  707. end;
  708. end;
  709. {$ENDIF}
  710. procedure TThreadedQueueList<T>.DoShutDown;
  711. begin
  712. fShutDown := True;
  713. {$IFDEF FPC}
  714. FQueueCondVar.SetEvent;
  715. {$ELSE}
  716. FQueueCondVar.ReleaseAll;
  717. {$ENDIF}
  718. end;
  719. {$IFNDEF FPC}
  720. { TThreadObjectList<T> }
  721. procedure TThreadObjectList<T>.Add(const Item: T);
  722. begin
  723. LockList;
  724. try
  725. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  726. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  727. finally
  728. UnlockList;
  729. end;
  730. end;
  731. procedure TThreadObjectList<T>.Clear;
  732. begin
  733. LockList;
  734. try
  735. fList.Clear;
  736. finally
  737. UnlockList;
  738. end;
  739. end;
  740. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  741. begin
  742. inherited Create;
  743. fLock := TObject.Create;
  744. fList := TObjectList<T>.Create;
  745. fDuplicates := dupIgnore;
  746. end;
  747. destructor TThreadObjectList<T>.Destroy;
  748. begin
  749. LockList;
  750. try
  751. fList.Free;
  752. inherited Destroy;
  753. finally
  754. UnlockList;
  755. fLock.Free;
  756. end;
  757. end;
  758. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  759. begin
  760. LockList;
  761. try
  762. Result := fList[aIndex];
  763. finally
  764. UnlockList;
  765. end;
  766. end;
  767. function TThreadObjectList<T>.LockList: TObjectList<T>;
  768. begin
  769. System.TMonitor.Enter(fLock);
  770. Result := fList;
  771. end;
  772. procedure TThreadObjectList<T>.Remove(const Item: T);
  773. begin
  774. RemoveItem(Item, TDirection.FromBeginning);
  775. end;
  776. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  777. begin
  778. LockList;
  779. try
  780. fList.RemoveItem(Item, Direction);
  781. finally
  782. UnlockList;
  783. end;
  784. end;
  785. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  786. begin
  787. LockList;
  788. try
  789. fList[aIndex] := aValue;
  790. finally
  791. UnlockList;
  792. end;
  793. end;
  794. procedure TThreadObjectList<T>.UnlockList;
  795. begin
  796. System.TMonitor.Exit(fLock);
  797. end;
  798. {$ENDIF}
  799. { TAnonymousThread }
  800. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  801. begin
  802. fThread := TAdvThread.Create(aProc,aSynchronize);
  803. end;
  804. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  805. begin
  806. Result := TAnonymousThread.Create(aProc,False);
  807. end;
  808. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  809. begin
  810. Result := TAnonymousThread.Create(aProc,True);
  811. end;
  812. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  813. begin
  814. Result := Self;
  815. fThread.OnTerminate(aProc,False);
  816. end;
  817. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  818. begin
  819. Result := Self;
  820. fThread.OnTerminate(aProc,True);
  821. end;
  822. procedure TAnonymousThread.Start;
  823. begin
  824. fThread.Start;
  825. end;
  826. { TTask }
  827. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  828. var
  829. i : Integer;
  830. begin
  831. fTaskStatus := TWorkTaskStatus.wtsPending;
  832. fExecuteWithSync := False;
  833. fTerminateWithSync := False;
  834. fExceptionWithSync := False;
  835. fOwnedParams := aOwnedParams;
  836. SetLength(fParamArray,High(aParamArray)+1);
  837. for i := Low(aParamArray) to High(aParamArray) do
  838. begin
  839. fParamArray[i].Create(aParamArray[i]);
  840. {$IFDEF FPC}
  841. fParamArray[i]._AddRef;
  842. {$ENDIF}
  843. end;
  844. fExecuteProc := aTaskProc;
  845. fEnabled := False;
  846. end;
  847. destructor TTask.Destroy;
  848. var
  849. i : Integer;
  850. begin
  851. //free passed params
  852. if fOwnedParams then
  853. begin
  854. for i := Low(fParamArray) to High(fParamArray) do
  855. begin
  856. {$IFDEF FPC}
  857. fParamArray[i]._Release;
  858. {$ENDIF}
  859. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  860. end;
  861. end;
  862. fParamArray := nil;
  863. inherited;
  864. end;
  865. procedure TTask.DoException(aException : Exception);
  866. begin
  867. fTaskStatus := TWorkTaskStatus.wtsException;
  868. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  869. else raise aException;
  870. end;
  871. procedure TTask.DoExecute;
  872. begin
  873. fTaskStatus := TWorkTaskStatus.wtsRunning;
  874. if Assigned(fExecuteProc) then fExecuteProc(Self);
  875. fTaskStatus := TWorkTaskStatus.wtsDone;
  876. end;
  877. procedure TTask.DoTerminate;
  878. begin
  879. if Assigned(fTerminateProc) then fTerminateProc(Self);
  880. end;
  881. function TTask.GetIdTask: Int64;
  882. begin
  883. Result := fIdTask;
  884. end;
  885. procedure TTask.SetIdTask(Value : Int64);
  886. begin
  887. fIdTask := Value;
  888. end;
  889. function TTask.GetNumWorker: Integer;
  890. begin
  891. Result := fNumWorker;
  892. end;
  893. function TTask.GetParam(aIndex: Integer): TFlexValue;
  894. begin
  895. Result := fParamArray[aIndex];
  896. end;
  897. function TTask.IsEnabled: Boolean;
  898. begin
  899. Result := fEnabled;
  900. end;
  901. procedure TTask.SetNumWorker(Value: Integer);
  902. begin
  903. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  904. fNumWorker := Value;
  905. end;
  906. function TTask.TaskStatus: TWorkTaskStatus;
  907. begin
  908. Result := fTaskStatus;
  909. end;
  910. { TWorkTask }
  911. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  912. begin
  913. fExceptProc := aTaskProc;
  914. Result := Self;
  915. end;
  916. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  917. begin
  918. fTerminateProc := aTaskProc;
  919. Result := Self;
  920. end;
  921. procedure TWorkTask.Run;
  922. begin
  923. fEnabled := True;
  924. end;
  925. { TBackgroundTasks }
  926. procedure TBackgroundTasks.CancelAll;
  927. begin
  928. fTaskQueue.Clear;
  929. end;
  930. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  931. begin
  932. fMaxQueue := aMaxQueue;
  933. fConcurrentWorkers := aConcurrentWorkers;
  934. fInsertTimeout := INFINITE;
  935. fExtractTimeout := 5000;
  936. fNumPushedTasks := 0;
  937. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  938. end;
  939. destructor TBackgroundTasks.Destroy;
  940. begin
  941. if Assigned(fWorkerPool) then fWorkerPool.Free;
  942. if Assigned(fTaskQueue) then fTaskQueue.Free;
  943. inherited;
  944. end;
  945. function TBackgroundTasks.GetTaskQueue: Cardinal;
  946. begin
  947. Result := fTaskQueue.QueueSize;
  948. end;
  949. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  950. begin
  951. Result := AddTask([],False,aTaskProc);
  952. end;
  953. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  954. var
  955. worktask : IWorkTask;
  956. begin
  957. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  958. Inc(fNumPushedTasks);
  959. worktask.SetIdTask(fNumPushedTasks);
  960. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  961. begin
  962. Result := worktask;
  963. end;
  964. end;
  965. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  966. begin
  967. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  968. TTask(Result).ExecuteWithSync := True;
  969. end;
  970. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  971. begin
  972. Result := AddTask_Sync([],False,aTaskProc);
  973. end;
  974. procedure TBackgroundTasks.Start;
  975. var
  976. i : Integer;
  977. worker : TWorker;
  978. begin
  979. //create workers
  980. if fWorkerPool <> nil then fWorkerPool.Free;
  981. fWorkerPool := TWorkerPool.Create(True);
  982. for i := 1 to fConcurrentWorkers do
  983. begin
  984. worker := TWorker.Create(i,fTaskQueue);
  985. fWorkerPool.Add(worker);
  986. worker.Start;
  987. end;
  988. end;
  989. { TWorker }
  990. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  991. begin
  992. inherited Create(True);
  993. fNumWorker := aNumWorker;
  994. fStatus := TWorkerStatus.wsSuspended;
  995. fTaskQueue := aTaskQueue;
  996. FreeOnTerminate := False;
  997. end;
  998. procedure TWorker.ExecuteTask;
  999. begin
  1000. fCurrentTask.DoExecute;
  1001. end;
  1002. procedure TWorker.TerminateTask;
  1003. begin
  1004. fCurrentTask.DoTerminate;
  1005. end;
  1006. procedure TWorker.Execute;
  1007. begin
  1008. fStatus := TWorkerStatus.wsIdle;
  1009. while (not Terminated) and (fTaskQueue.QueueSize > 0) do
  1010. begin
  1011. fCurrentTask := fTaskQueue.PopItem;
  1012. if fCurrentTask <> nil then
  1013. try
  1014. fStatus := TWorkerStatus.wsWorking;
  1015. try
  1016. fCurrentIdTask := fCurrentTask.GetIdTask;
  1017. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1018. else fCurrentTask.DoExecute;
  1019. except
  1020. on E : Exception do
  1021. begin
  1022. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1023. else raise Exception.Create(e.Message);
  1024. end;
  1025. end;
  1026. finally
  1027. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1028. else fCurrentTask.DoTerminate;
  1029. fStatus := TWorkerStatus.wsIdle;
  1030. end;
  1031. end;
  1032. fStatus := TWorkerStatus.wsSuspended
  1033. end;
  1034. { TScheduledTasks }
  1035. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1036. begin
  1037. Result := AddTask(aTaskName,[],False,aTaskProc);
  1038. end;
  1039. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1040. var
  1041. scheduletask : TScheduledTask;
  1042. begin
  1043. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1044. scheduletask.Name := aTaskName;
  1045. Inc(fNumPushedTasks);
  1046. scheduletask.SetIdTask(fNumPushedTasks);
  1047. fTaskList.Add(scheduletask);
  1048. Result := scheduletask;
  1049. end;
  1050. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1051. begin
  1052. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1053. TTask(Result).ExecuteWithSync := True;
  1054. end;
  1055. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1056. begin
  1057. Result := AddTask_Sync(aTaskName,aTaskProc);
  1058. end;
  1059. constructor TScheduledTasks.Create;
  1060. begin
  1061. fNumPushedTasks := 0;
  1062. fIsStarted := False;
  1063. fTaskList := TScheduledTaskList.Create;
  1064. end;
  1065. destructor TScheduledTasks.Destroy;
  1066. begin
  1067. if Assigned(fScheduler) then
  1068. begin
  1069. fScheduler.Terminate;
  1070. fScheduler.WaitFor;
  1071. fScheduler.Free;
  1072. end;
  1073. if Assigned(fTaskList) then fTaskList.Free;
  1074. inherited;
  1075. end;
  1076. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1077. begin
  1078. Result := fScheduler.Get(aIdTask);
  1079. end;
  1080. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1081. begin
  1082. if not Assigned(fScheduler) then raise Exception.Create('Scheduler must be started to get a task!');
  1083. Result := fScheduler.Get(aTaskName);
  1084. end;
  1085. procedure TScheduledTasks.Start;
  1086. begin
  1087. if fIsStarted then Exit;
  1088. if not Assigned(fScheduler) then
  1089. begin
  1090. fScheduler := TScheduler.Create(fTaskList);
  1091. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1092. end;
  1093. fScheduler.Start;
  1094. fIsStarted := True;
  1095. end;
  1096. procedure TScheduledTasks.Stop;
  1097. begin
  1098. if Assigned(fScheduler) then fScheduler.Terminate;
  1099. fIsStarted := False;
  1100. end;
  1101. { TScheduledTask }
  1102. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1103. begin
  1104. Result := Self;
  1105. ClearSchedule;
  1106. fScheduleMode := TScheduleMode.smRunOnce;
  1107. fStartDate := aStartDate;
  1108. fNextExecution := aStartDate;
  1109. end;
  1110. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1111. begin
  1112. Result := Self;
  1113. ClearSchedule;
  1114. fScheduleMode := TScheduleMode.smRunOnce;
  1115. fStartDate := IncMinute(Now(),aMinutes);
  1116. fNextExecution := fStartDate;
  1117. end;
  1118. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1119. begin
  1120. Result := Self;
  1121. ClearSchedule;
  1122. fScheduleMode := TScheduleMode.smRunOnce;
  1123. fStartDate := IncSecond(Now(),aSeconds);
  1124. fNextExecution := fStartDate;
  1125. end;
  1126. function TScheduledTask.StartNow: IScheduledTask;
  1127. begin
  1128. Result := Self;
  1129. ClearSchedule;
  1130. fScheduleMode := TScheduleMode.smRunOnce;
  1131. fStartDate := Now();
  1132. fNextExecution := fStartDate;
  1133. end;
  1134. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1135. begin
  1136. Result := Self;
  1137. ClearSchedule;
  1138. fScheduleMode := TScheduleMode.smRunOnce;
  1139. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1140. fNextExecution := fStartDate;
  1141. end;
  1142. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1143. begin
  1144. Result := Self;
  1145. ClearSchedule;
  1146. fScheduleMode := TScheduleMode.smRunOnce;
  1147. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1148. fNextExecution := fStartDate;
  1149. end;
  1150. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1151. begin
  1152. Result := Self;
  1153. ClearSchedule;
  1154. fScheduleMode := TScheduleMode.smRunOnce;
  1155. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1156. fNextExecution := fStartDate;
  1157. end;
  1158. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1159. begin
  1160. if fStartDate = 0.0 then ClearSchedule;
  1161. fScheduleMode := TScheduleMode.smRepeatMode;
  1162. fTimeMeasure := aTimeMeasure;
  1163. fTimeInterval := aInterval;
  1164. if fStartDate = 0.0 then fStartDate := Now();
  1165. fNextExecution := fStartDate;
  1166. fEnabled := True;
  1167. end;
  1168. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1169. begin
  1170. if fStartDate = 0.0 then ClearSchedule;
  1171. fScheduleMode := TScheduleMode.smRepeatMode;
  1172. fTimeMeasure := aTimeMeasure;
  1173. fTimeInterval := aInterval;
  1174. if fStartDate = 0.0 then fStartDate := Now();
  1175. fExpirationDate := aEndTime;
  1176. fNextExecution := fStartDate;
  1177. fEnabled := True;
  1178. end;
  1179. procedure TScheduledTask.RepeatEveryDay;
  1180. begin
  1181. RepeatEvery(1,tmDays);
  1182. end;
  1183. procedure TScheduledTask.RepeatEveryWeek;
  1184. begin
  1185. RepeatEvery(7,tmDays);
  1186. end;
  1187. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1188. begin
  1189. if fStartDate = 0.0 then ClearSchedule;
  1190. fScheduleMode := TScheduleMode.smRepeatMode;
  1191. fTimeMeasure := aTimeMeasure;
  1192. fTimeInterval := aInterval;
  1193. if fStartDate = 0.0 then fStartDate := Now();
  1194. fExpirationTimes := aRepeatTimes;
  1195. fNextExecution := fStartDate;
  1196. fEnabled := True;
  1197. end;
  1198. procedure TScheduledTask.RunOnce;
  1199. begin
  1200. fScheduleMode := TScheduleMode.smRunOnce;
  1201. if fStartDate = 0.0 then fStartDate := Now();
  1202. fNextExecution := fStartDate;
  1203. fEnabled := True;
  1204. end;
  1205. procedure TScheduledTask.Cancel;
  1206. begin
  1207. fFinished := True;
  1208. end;
  1209. function TScheduledTask.CheckSchedule: Boolean;
  1210. begin
  1211. Result := False;
  1212. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1213. if fScheduleMode = TScheduleMode.smRunOnce then
  1214. begin
  1215. //if start date reached
  1216. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1217. begin
  1218. fLastExecution := Now();
  1219. Inc(fExecutionTimes);
  1220. fFinished := True;
  1221. Result := True;
  1222. end;
  1223. end
  1224. else
  1225. begin
  1226. //if next execution reached
  1227. if Now() >= fNextExecution then
  1228. begin
  1229. //check expiration limits
  1230. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1231. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1232. begin
  1233. fFinished := True;
  1234. Exit;
  1235. end;
  1236. //calculate next execution
  1237. case fTimeMeasure of
  1238. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1239. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1240. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1241. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1242. end;
  1243. fLastExecution := Now();
  1244. Inc(fExecutionTimes);
  1245. Result := True;
  1246. end;
  1247. end;
  1248. end;
  1249. procedure TScheduledTask.ClearSchedule;
  1250. begin
  1251. inherited;
  1252. fFinished := False;
  1253. fStartDate := 0.0;
  1254. fLastExecution := 0.0;
  1255. fNextExecution := 0.0;
  1256. fExpirationDate := 0.0;
  1257. fScheduleMode := TScheduleMode.smRunOnce;
  1258. fTimeMeasure := TTimeMeasure.tmSeconds;
  1259. fTimeInterval := 0;
  1260. end;
  1261. procedure TScheduledTask.DoExpire;
  1262. begin
  1263. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1264. fEnabled := False;
  1265. end;
  1266. function TScheduledTask.GetTaskName: string;
  1267. begin
  1268. Result := fName;
  1269. end;
  1270. function TScheduledTask.IsFinished: Boolean;
  1271. begin
  1272. Result := fFinished;
  1273. end;
  1274. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1275. begin
  1276. fExceptProc := aTaskProc;
  1277. Result := Self;
  1278. end;
  1279. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1280. begin
  1281. Result := OnException(aTaskProc);
  1282. TTask(Result).ExceptionWithSync := True;
  1283. end;
  1284. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1285. begin
  1286. fExpiredProc := aTaskProc;
  1287. Result := Self;
  1288. end;
  1289. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1290. begin
  1291. Result := OnExpired(aTaskProc);
  1292. TScheduledTask(Result).ExpireWithSync := True;
  1293. end;
  1294. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1295. begin
  1296. fTerminateProc := aTaskProc;
  1297. Result := Self;
  1298. end;
  1299. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1300. begin
  1301. Result := OnTerminated(aTaskProc);
  1302. TTask(Result).TerminateWithSync := True;
  1303. end;
  1304. { TScheduledWorker }
  1305. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1306. begin
  1307. inherited Create(aNumWorker,nil);
  1308. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1309. FreeOnTerminate := True;
  1310. fCurrentTask := aScheduledTask;
  1311. end;
  1312. procedure TScheduledWorker.Execute;
  1313. begin
  1314. fStatus := TWorkerStatus.wsIdle;
  1315. if Assigned(fCurrentTask) then
  1316. begin
  1317. try
  1318. fStatus := TWorkerStatus.wsWorking;
  1319. try
  1320. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1321. else fCurrentTask.DoExecute;
  1322. fStatus := TWorkerStatus.wsIdle;
  1323. except
  1324. on E : Exception do
  1325. begin
  1326. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1327. else raise Exception.Create(e.Message);
  1328. end;
  1329. end;
  1330. finally
  1331. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1332. else fCurrentTask.DoTerminate;
  1333. //check if expired
  1334. if (fCurrentTask as IScheduledTask).IsFinished then
  1335. begin
  1336. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1337. else (fCurrentTask as IScheduledTask).DoExpire;
  1338. end;
  1339. end;
  1340. end;
  1341. fCurrentTask := nil;
  1342. fStatus := TWorkerStatus.wsSuspended;
  1343. end;
  1344. procedure TScheduledWorker.ExpireTask;
  1345. begin
  1346. (fCurrentTask as IScheduledTask).DoExpire;
  1347. end;
  1348. { TScheduler }
  1349. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1350. begin
  1351. inherited Create(True);
  1352. FreeOnTerminate := False;
  1353. fListLock := TCriticalSection.Create;
  1354. fRemoveTaskAfterExpiration := False;
  1355. fTaskList := aTaskList;
  1356. {$IFDEF FPC}
  1357. fCondVar := TSimpleEvent.Create;
  1358. {$ELSE}
  1359. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1360. {$ENDIF}
  1361. end;
  1362. destructor TScheduler.Destroy;
  1363. begin
  1364. fCondVar.SetEvent;
  1365. fCondVar.Free;
  1366. fTaskList := nil;
  1367. fListLock.Free;
  1368. inherited;
  1369. end;
  1370. procedure TScheduler.Execute;
  1371. var
  1372. task : IScheduledTask;
  1373. worker : TScheduledWorker;
  1374. numworker : Int64;
  1375. begin
  1376. numworker := 0;
  1377. while not Terminated do
  1378. begin
  1379. fListLock.Enter;
  1380. try
  1381. for task in fTaskList do
  1382. begin
  1383. if (task.IsEnabled) and (not task.IsFinished) then
  1384. begin
  1385. if task.CheckSchedule then
  1386. begin
  1387. Inc(numworker);
  1388. worker := TScheduledWorker.Create(numworker,task);
  1389. worker.Start;
  1390. end;
  1391. end
  1392. else
  1393. begin
  1394. if task.IsEnabled then
  1395. begin
  1396. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1397. // else task.DoExpire;
  1398. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1399. end;
  1400. end;
  1401. end;
  1402. task := nil;
  1403. finally
  1404. fListLock.Leave;
  1405. end;
  1406. fCondVar.WaitFor(250);
  1407. end;
  1408. end;
  1409. procedure TScheduler.ExpireTask;
  1410. begin
  1411. end;
  1412. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1413. begin
  1414. Result := fTaskList.Add(aTask);
  1415. end;
  1416. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1417. var
  1418. task : IScheduledTask;
  1419. begin
  1420. fListLock.Enter;
  1421. try
  1422. for task in fTaskList do
  1423. begin
  1424. if task.IdTask = aIdTask then Exit(task);
  1425. end;
  1426. finally
  1427. fListLock.Leave;
  1428. end;
  1429. end;
  1430. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1431. var
  1432. task : IScheduledTask;
  1433. begin
  1434. fListLock.Enter;
  1435. try
  1436. for task in fTaskList do
  1437. begin
  1438. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1439. end;
  1440. finally
  1441. fListLock.Leave;
  1442. end;
  1443. end;
  1444. { TAdvThread }
  1445. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1446. begin
  1447. inherited Create(True);
  1448. FreeOnTerminate := True;
  1449. fExecuteWithSync := aSynchronize;
  1450. fExecuteProc := aProc;
  1451. end;
  1452. procedure TAdvThread.DoExecute;
  1453. begin
  1454. if Assigned(fExecuteProc) then fExecuteProc;
  1455. end;
  1456. procedure TAdvThread.CallToTerminate;
  1457. begin
  1458. if Assigned(fTerminateProc) then fTerminateProc;
  1459. end;
  1460. procedure TAdvThread.DoTerminate;
  1461. begin
  1462. if fTerminateWithSync then Synchronize(CallToTerminate)
  1463. else CallToTerminate;
  1464. end;
  1465. procedure TAdvThread.Execute;
  1466. begin
  1467. if fExecuteWithSync then Synchronize(DoExecute)
  1468. else DoExecute;
  1469. end;
  1470. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1471. begin
  1472. fTerminateWithSync := aSynchronize;
  1473. fTerminateProc := aProc;
  1474. end;
  1475. end.