Quick.Threads.pas 41 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 16/01/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. {$IFNDEF FPC}
  33. System.RTLConsts,
  34. System.Generics.Collections,
  35. System.SyncObjs;
  36. {$ELSE}
  37. RtlConsts,
  38. Generics.Collections,
  39. syncobjs;
  40. {$ENDIF}
  41. type
  42. TThreadedQueueCS<T> = class
  43. private
  44. FQueue: array of T;
  45. FQueueSize, FQueueOffset: Integer;
  46. FQueueLock: TCriticalSection;
  47. {$IFDEF FPC}
  48. FQueueCondVar : TEventObject;
  49. {$ELSE}
  50. FQueueCondVar: TConditionVariableCS;
  51. {$ENDIF}
  52. FShutDown: Boolean;
  53. FPushTimeout, FPopTimeout: Cardinal;
  54. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  55. public
  56. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  57. destructor Destroy; override;
  58. procedure Grow(ADelta: Integer);
  59. function PushItem(const AItem: T): TWaitResult; overload;
  60. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  61. function PopItem: T; overload;
  62. function PopItem(var AQueueSize: Integer): T; overload;
  63. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  64. function PopItem(var AItem: T): TWaitResult; overload;
  65. procedure DoShutDown;
  66. procedure Clear;
  67. property QueueSize: Integer read FQueueSize;
  68. property ShutDown: Boolean read FShutDown;
  69. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  70. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  71. end;
  72. TThreadedQueueList<T> = class
  73. private
  74. fQueue : TQueue<T>;
  75. fQueueSize : Integer;
  76. fQueueLock : TCriticalSection;
  77. {$IFDEF FPC}
  78. FQueueCondVar : TSimpleEvent;
  79. {$ELSE}
  80. FQueueCondVar: TConditionVariableCS;
  81. {$ENDIF}
  82. fShutDown : Boolean;
  83. fPushTimeout : Cardinal;
  84. fPopTimeout : Cardinal;
  85. fTotalItemsPushed : Cardinal;
  86. fTotalItemsPopped : Cardinal;
  87. public
  88. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  89. destructor Destroy; override;
  90. procedure Grow(ADelta: Integer);
  91. function PushItem(const AItem: T): TWaitResult; overload;
  92. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  93. function PopItem: T; overload;
  94. function PopItem(var AQueueSize: Integer): T; overload;
  95. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  96. function PopItem(var AItem: T): TWaitResult; overload;
  97. procedure DoShutDown;
  98. property QueueSize: Integer read FQueueSize;
  99. property ShutDown: Boolean read FShutDown;
  100. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  101. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  102. end;
  103. {$IFNDEF FPC}
  104. TThreadObjectList<T: class> = class(TList<T>)
  105. private
  106. fList: TObjectList<T>;
  107. fLock: TObject;
  108. fDuplicates: TDuplicates;
  109. function GetItem(aIndex : Integer) : T;
  110. procedure SetItem(aIndex : Integer; aValue : T);
  111. public
  112. constructor Create(OwnedObjects : Boolean);
  113. destructor Destroy; override;
  114. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  115. procedure Add(const Item: T);
  116. procedure Clear;
  117. function LockList: TObjectList<T>;
  118. procedure Remove(const Item: T); inline;
  119. procedure RemoveItem(const Item: T; Direction: TDirection);
  120. procedure UnlockList; inline;
  121. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  122. end;
  123. {$ENDIF}
  124. {$IFDEF FPC}
  125. TProc = procedure of object;
  126. {$ENDIF}
  127. TAdvThread = class(TThread)
  128. private
  129. fExecuteProc : TProc;
  130. fTerminateProc : TProc;
  131. fExecuteWithSync : Boolean;
  132. fTerminateWithSync : Boolean;
  133. procedure DoExecute;
  134. procedure CallToTerminate;
  135. protected
  136. procedure DoTerminate; override;
  137. public
  138. constructor Create(aProc : TProc; aSynchronize : Boolean);
  139. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  140. procedure Execute; override;
  141. end;
  142. IAnonymousThread = interface
  143. procedure Start;
  144. function OnTerminate(aProc : TProc) : IAnonymousThread;
  145. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  146. end;
  147. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  148. private
  149. fThread : TAdvThread;
  150. constructor Create(aProc : TProc; aSynchronize : Boolean);
  151. public
  152. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  153. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  154. procedure Start;
  155. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  156. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  157. end;
  158. TParamArray = array of TFlexValue;
  159. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  160. TScheduleMode = (smRunOnce, smRepeatMode);
  161. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
  162. ITask = interface
  163. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  164. function GetParam(aIndex : Integer) : TFlexValue;
  165. function TaskStatus : TWorkTaskStatus;
  166. function GetNumWorker : Integer;
  167. procedure SetNumWorker(Value : Integer);
  168. function GetIdTask : Int64;
  169. procedure SetIdTask(Value : Int64);
  170. procedure DoExecute;
  171. procedure DoException(aException : Exception);
  172. procedure DoTerminate;
  173. property Param[index : Integer] : TFlexValue read GetParam;
  174. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  175. property IdTask : Int64 read GetIdTask;
  176. function IsEnabled : Boolean;
  177. end;
  178. {$IFNDEF FPC}
  179. TTaskProc = reference to procedure(task : ITask);
  180. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  181. {$ELSE}
  182. TTaskProc = procedure(task : ITask) of object;
  183. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  184. {$ENDIF}
  185. IWorkTask = interface(ITask)
  186. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  187. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  188. procedure Run;
  189. end;
  190. IScheduledTask = interface(ITask)
  191. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  192. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  193. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  194. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  195. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  196. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  197. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  198. function CheckSchedule : Boolean;
  199. procedure DoExpire;
  200. function GetTaskName : string;
  201. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  202. procedure RunOnce;
  203. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  204. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  205. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  206. function IsFinished : Boolean;
  207. procedure Cancel;
  208. property Name : string read GetTaskName;
  209. end;
  210. TTask = class(TInterfacedObject,ITask)
  211. private
  212. fIdTask : Int64;
  213. fNumWorker : Integer;
  214. fParamArray : TParamArray;
  215. fExecuteProc : TTaskProc;
  216. fExceptProc : TTaskExceptionProc;
  217. fTerminateProc : TTaskProc;
  218. fExpiredProc : TTaskProc;
  219. fTaskStatus : TWorkTaskStatus;
  220. fOwnedParams : Boolean;
  221. fEnabled : Boolean;
  222. fExecuteWithSync : Boolean;
  223. fExceptionWithSync : Boolean;
  224. fTerminateWithSync : Boolean;
  225. function GetParam(aIndex : Integer) : TFlexValue;
  226. procedure DoExecute;
  227. procedure DoException(aException : Exception);
  228. procedure DoTerminate;
  229. function GetNumWorker : Integer;
  230. procedure SetNumWorker(Value : Integer);
  231. function GetIdTask : Int64;
  232. procedure SetIdTask(Value : Int64);
  233. protected
  234. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  235. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  236. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  237. public
  238. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  239. destructor Destroy; override;
  240. property IdTask : Int64 read GetIdTask;
  241. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  242. function IsEnabled : Boolean;
  243. function TaskStatus : TWorkTaskStatus;
  244. end;
  245. TWorkTask = class(TTask,IWorkTask)
  246. public
  247. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  248. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  249. procedure Run; virtual;
  250. end;
  251. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  252. TScheduledTask = class(TTask,IScheduledTask)
  253. private
  254. fName : string;
  255. fExecutionTimes : Integer;
  256. fScheduleMode : TScheduleMode;
  257. fTimeInterval : Integer;
  258. fTimeMeasure : TTimeMeasure;
  259. fStartDate : TDateTime;
  260. fLastExecution : TDateTime;
  261. fNextExecution : TDateTime;
  262. fExpirationDate : TDateTime;
  263. fExpirationTimes : Integer;
  264. fFinished : Boolean;
  265. fExpireWithSync: Boolean;
  266. procedure ClearSchedule;
  267. function CheckSchedule : Boolean;
  268. procedure DoExpire;
  269. function GetTaskName : string;
  270. protected
  271. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  272. public
  273. property Name : string read fName write fName;
  274. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  275. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  276. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  277. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  278. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  279. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  280. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  281. procedure RunOnce;
  282. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  284. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  285. function IsFinished : Boolean;
  286. procedure Cancel;
  287. end;
  288. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  289. TWorker = class(TThread)
  290. private
  291. fNumWorker : Integer;
  292. fCurrentIdTask : Integer;
  293. fStatus : TWorkerStatus;
  294. fTaskQueue : TTaskQueue;
  295. procedure ExecuteTask;
  296. procedure TerminateTask;
  297. protected
  298. fCurrentTask : ITask;
  299. public
  300. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  301. property NumWorker : Integer read fNumWorker;
  302. property Status : TWorkerStatus read fStatus;
  303. procedure Execute; override;
  304. end;
  305. TScheduledWorker = class(TWorker)
  306. private
  307. procedure ExpireTask;
  308. public
  309. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  310. procedure Execute; override;
  311. end;
  312. TWorkerPool = TObjectList<TWorker>;
  313. TBackgroundTasks = class
  314. private
  315. fMaxQueue : Integer;
  316. fWorkerPool : TWorkerPool;
  317. fConcurrentWorkers : Integer;
  318. fInsertTimeout : Cardinal;
  319. fExtractTimeout : Cardinal;
  320. fTaskQueue : TTaskQueue;
  321. fNumPushedTasks : Int64;
  322. function GetTaskQueue : Cardinal;
  323. public
  324. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  325. destructor Destroy; override;
  326. property MaxQueue : Integer read fMaxQueue;
  327. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  328. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  329. property TaskQueued : Cardinal read GetTaskQueue;
  330. property NumPushedTasks : Int64 read fNumPushedTasks;
  331. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  332. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  333. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  334. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  335. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  336. procedure Start;
  337. procedure CancelAll;
  338. end;
  339. TScheduledTaskList = TList<IScheduledTask>;
  340. TScheduler = class(TThread)
  341. private
  342. fListLock : TCriticalSection;
  343. fCondVar : TSimpleEvent;
  344. fTaskList : TScheduledTaskList;
  345. fRemoveTaskAfterExpiration : Boolean;
  346. procedure ExpireTask;
  347. public
  348. constructor Create(aTaskList : TScheduledTaskList);
  349. destructor Destroy; override;
  350. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  351. procedure Execute; override;
  352. function Add(aTask : TScheduledTask) : Integer;
  353. function Get(aIdTask : Int64) : IScheduledTask; overload;
  354. function Get(const aTaskName : string) : IScheduledTask; overload;
  355. end;
  356. TScheduledTasks = class
  357. private
  358. fTaskList : TScheduledTaskList;
  359. fScheduler : TScheduler;
  360. fNumPushedTasks : Int64;
  361. fRemoveTaskAfterExpiration : Boolean;
  362. fIsStarted : Boolean;
  363. public
  364. constructor Create;
  365. destructor Destroy; override;
  366. property NumPushedTasks : Int64 read fNumPushedTasks;
  367. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  368. property IsStarted : Boolean read fIsStarted;
  369. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  370. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  371. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  372. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  373. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  374. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  375. procedure Start;
  376. procedure Stop;
  377. end;
  378. implementation
  379. { TThreadedQueueCS<T> }
  380. procedure TThreadedQueueCS<T>.Clear;
  381. var
  382. obj : T;
  383. begin
  384. FQueueLock.Enter;
  385. try
  386. for obj in FQueue do
  387. begin
  388. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  389. end;
  390. SetLength(FQueue,0);
  391. finally
  392. FQueueLock.Leave;
  393. end;
  394. {$IFDEF FPC}
  395. FQueueCondVar.SetEvent;
  396. {$ELSE}
  397. FQueueCondVar.ReleaseAll;
  398. {$ENDIF}
  399. end;
  400. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  401. begin
  402. inherited Create;
  403. SetLength(FQueue, AQueueDepth);
  404. FQueueLock := TCriticalSection.Create;
  405. {$IFDEF FPC}
  406. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  407. {$ELSE}
  408. FQueueCondVar := TConditionVariableCS.Create;
  409. {$ENDIF}
  410. FPushTimeout := PushTimeout;
  411. FPopTimeout := PopTimeout;
  412. end;
  413. destructor TThreadedQueueCS<T>.Destroy;
  414. begin
  415. DoShutDown;
  416. FQueueLock.Free;
  417. FQueueCondVar.Free;
  418. inherited;
  419. end;
  420. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  421. begin
  422. FQueueLock.Enter;
  423. try
  424. SetLength(FQueue, Length(FQueue) + ADelta);
  425. finally
  426. FQueueLock.Leave;
  427. end;
  428. {$IFDEF FPC}
  429. FQueueCondVar.SetEvent;
  430. {$ELSE}
  431. FQueueCondVar.ReleaseAll;
  432. {$ENDIF}
  433. end;
  434. function TThreadedQueueCS<T>.PopItem: T;
  435. var
  436. LQueueSize: Integer;
  437. begin
  438. PopItem(LQueueSize, Result);
  439. end;
  440. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  441. begin
  442. AItem := Default(T);
  443. FQueueLock.Enter;
  444. try
  445. Result := wrSignaled;
  446. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  447. begin
  448. {$IFDEF FPC}
  449. Result := FQueueCondVar.WaitFor(FPopTimeout);
  450. {$ELSE}
  451. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  452. {$ENDIF}
  453. end;
  454. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  455. AItem := FQueue[FQueueOffset];
  456. FQueue[FQueueOffset] := Default(T);
  457. if FQueueSize = Length(FQueue) then
  458. begin
  459. {$IFDEF FPC}
  460. FQueueCondVar.SetEvent;
  461. {$ELSE}
  462. FQueueCondVar.ReleaseAll;
  463. {$ENDIF}
  464. end;
  465. Dec(FQueueSize);
  466. Inc(FQueueOffset);
  467. Inc(FTotalItemsPopped);
  468. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  469. finally
  470. AQueueSize := FQueueSize;
  471. FQueueLock.Leave;
  472. end;
  473. end;
  474. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  475. var
  476. LQueueSize: Integer;
  477. begin
  478. Result := PopItem(LQueueSize, AItem);
  479. end;
  480. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  481. begin
  482. PopItem(AQueueSize, Result);
  483. end;
  484. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  485. var
  486. LQueueSize: Integer;
  487. begin
  488. Result := PushItem(AItem, LQueueSize);
  489. end;
  490. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  491. begin
  492. FQueueLock.Enter;
  493. try
  494. Result := wrSignaled;
  495. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  496. begin
  497. {$IFDEF FPC}
  498. Result := FQueueCondVar.WaitFor(FPushTimeout);
  499. {$ELSE}
  500. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  501. {$ENDIF}
  502. end;
  503. if FShutDown or (Result <> wrSignaled) then Exit;
  504. if FQueueSize = 0 then
  505. begin
  506. {$IFDEF FPC}
  507. FQueueCondVar.SetEvent;
  508. {$ELSE}
  509. FQueueCondVar.ReleaseAll;
  510. {$ENDIF}
  511. end;
  512. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  513. Inc(FQueueSize);
  514. Inc(FTotalItemsPushed);
  515. finally
  516. AQueueSize := FQueueSize;
  517. FQueueLock.Leave;
  518. end;
  519. end;
  520. procedure TThreadedQueueCS<T>.DoShutDown;
  521. begin
  522. FShutDown := True;
  523. {$IFDEF FPC}
  524. FQueueCondVar.SetEvent;
  525. {$ELSE}
  526. FQueueCondVar.ReleaseAll;
  527. {$ENDIF}
  528. end;
  529. { TThreadedQueueList<T> }
  530. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  531. begin
  532. inherited Create;
  533. fQueue := TQueue<T>.Create;
  534. fQueue.Capacity := AQueueDepth;
  535. fQueueSize := 0;
  536. fQueueLock := TCriticalSection.Create;
  537. {$IFDEF FPC}
  538. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  539. {$ELSE}
  540. fQueueCondVar := TConditionVariableCS.Create;
  541. {$ENDIF}
  542. fPushTimeout := PushTimeout;
  543. fPopTimeout := PopTimeout;
  544. end;
  545. destructor TThreadedQueueList<T>.Destroy;
  546. begin
  547. DoShutDown;
  548. fQueueLock.Free;
  549. fQueueCondVar.Free;
  550. fQueue.Free;
  551. inherited;
  552. end;
  553. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  554. begin
  555. fQueueLock.Enter;
  556. try
  557. fQueue.Capacity := fQueue.Capacity + ADelta;
  558. finally
  559. fQueueLock.Leave;
  560. end;
  561. {$IFDEF FPC}
  562. FQueueCondVar.SetEvent;
  563. {$ELSE}
  564. FQueueCondVar.ReleaseAll;
  565. {$ENDIF}
  566. end;
  567. function TThreadedQueueList<T>.PopItem: T;
  568. var
  569. LQueueSize: Integer;
  570. begin
  571. PopItem(LQueueSize, Result);
  572. end;
  573. {$IFDEF FPC}
  574. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  575. //var
  576. //crono : TChronometer;
  577. begin
  578. AItem := Default(T);
  579. //crono := TChronometer.Create(False);
  580. try
  581. Result := wrSignaled;
  582. //writeln('popitem');
  583. //crono.Start;
  584. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  585. begin
  586. //crono.Start;
  587. Result := FQueueCondVar.WaitFor(FPopTimeout);
  588. //crono.Stop;
  589. //writeln('in: ' + crono.ElapsedTime);
  590. //if result = twaitresult.wrError then result := twaitresult.wrError;
  591. end;
  592. //crono.Stop;
  593. //writeln('out: ' + crono.ElapsedTime);
  594. fQueueLock.Enter;
  595. try
  596. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  597. AItem := fQueue.Extract;
  598. Dec(FQueueSize);
  599. Inc(fTotalItemsPopped);
  600. finally
  601. fQueueLock.Leave;
  602. end;
  603. finally
  604. AQueueSize := fQueueSize;
  605. end;
  606. end;
  607. {$ELSE}
  608. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  609. begin
  610. AItem := Default(T);
  611. fQueueLock.Enter;
  612. try
  613. Result := wrSignaled;
  614. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  615. begin
  616. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  617. end;
  618. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  619. AItem := fQueue.Extract;
  620. if fQueueSize = fQueue.Count then
  621. begin
  622. FQueueCondVar.ReleaseAll;
  623. end;
  624. Dec(FQueueSize);
  625. Inc(fTotalItemsPopped);
  626. finally
  627. AQueueSize := fQueueSize;
  628. fQueueLock.Leave;
  629. end;
  630. end;
  631. {$ENDIF}
  632. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  633. var
  634. LQueueSize: Integer;
  635. begin
  636. Result := PopItem(LQueueSize, AItem);
  637. end;
  638. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  639. begin
  640. PopItem(AQueueSize, Result);
  641. end;
  642. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  643. var
  644. LQueueSize: Integer;
  645. begin
  646. Result := PushItem(AItem, LQueueSize);
  647. end;
  648. {$IFDEF FPC}
  649. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  650. begin
  651. FQueueLock.Enter;
  652. try
  653. Result := wrSignaled;
  654. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  655. //begin
  656. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  657. //end;
  658. if fShutDown or (Result <> wrSignaled) then Exit;
  659. //if fQueueSize = 0 then
  660. //begin
  661. // FQueueCondVar.SetEvent;
  662. //end;
  663. fQueue.Enqueue(AItem);
  664. Inc(FQueueSize);
  665. Inc(fTotalItemsPushed);
  666. finally
  667. AQueueSize := fQueueSize;
  668. FQueueLock.Leave;
  669. //FQueueCondVar.SetEvent;
  670. end;
  671. end;
  672. {$ELSE}
  673. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  674. begin
  675. FQueueLock.Enter;
  676. try
  677. Result := wrSignaled;
  678. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  679. //begin
  680. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  681. //end;
  682. if fShutDown or (Result <> wrSignaled) then Exit;
  683. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  684. fQueue.Enqueue(AItem);
  685. Inc(FQueueSize);
  686. Inc(fTotalItemsPushed);
  687. finally
  688. AQueueSize := fQueueSize;
  689. FQueueLock.Leave;
  690. end;
  691. end;
  692. {$ENDIF}
  693. procedure TThreadedQueueList<T>.DoShutDown;
  694. begin
  695. fShutDown := True;
  696. {$IFDEF FPC}
  697. FQueueCondVar.SetEvent;
  698. {$ELSE}
  699. FQueueCondVar.ReleaseAll;
  700. {$ENDIF}
  701. end;
  702. {$IFNDEF FPC}
  703. { TThreadObjectList<T> }
  704. procedure TThreadObjectList<T>.Add(const Item: T);
  705. begin
  706. LockList;
  707. try
  708. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  709. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  710. finally
  711. UnlockList;
  712. end;
  713. end;
  714. procedure TThreadObjectList<T>.Clear;
  715. begin
  716. LockList;
  717. try
  718. fList.Clear;
  719. finally
  720. UnlockList;
  721. end;
  722. end;
  723. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  724. begin
  725. inherited Create;
  726. fLock := TObject.Create;
  727. fList := TObjectList<T>.Create;
  728. fDuplicates := dupIgnore;
  729. end;
  730. destructor TThreadObjectList<T>.Destroy;
  731. begin
  732. LockList;
  733. try
  734. fList.Free;
  735. inherited Destroy;
  736. finally
  737. UnlockList;
  738. fLock.Free;
  739. end;
  740. end;
  741. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  742. begin
  743. LockList;
  744. try
  745. Result := fList[aIndex];
  746. finally
  747. UnlockList;
  748. end;
  749. end;
  750. function TThreadObjectList<T>.LockList: TObjectList<T>;
  751. begin
  752. System.TMonitor.Enter(fLock);
  753. Result := fList;
  754. end;
  755. procedure TThreadObjectList<T>.Remove(const Item: T);
  756. begin
  757. RemoveItem(Item, TDirection.FromBeginning);
  758. end;
  759. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  760. begin
  761. LockList;
  762. try
  763. fList.RemoveItem(Item, Direction);
  764. finally
  765. UnlockList;
  766. end;
  767. end;
  768. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  769. begin
  770. LockList;
  771. try
  772. fList[aIndex] := aValue;
  773. finally
  774. UnlockList;
  775. end;
  776. end;
  777. procedure TThreadObjectList<T>.UnlockList;
  778. begin
  779. System.TMonitor.Exit(fLock);
  780. end;
  781. {$ENDIF}
  782. { TAnonymousThread }
  783. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  784. begin
  785. fThread := TAdvThread.Create(aProc,aSynchronize);
  786. end;
  787. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  788. begin
  789. Result := TAnonymousThread.Create(aProc,False);
  790. end;
  791. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  792. begin
  793. Result := TAnonymousThread.Create(aProc,True);
  794. end;
  795. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  796. begin
  797. Result := Self;
  798. fThread.OnTerminate(aProc,False);
  799. end;
  800. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  801. begin
  802. Result := Self;
  803. fThread.OnTerminate(aProc,True);
  804. end;
  805. procedure TAnonymousThread.Start;
  806. begin
  807. fThread.Start;
  808. end;
  809. { TTask }
  810. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  811. var
  812. i : Integer;
  813. begin
  814. fTaskStatus := TWorkTaskStatus.wtsPending;
  815. fExecuteWithSync := False;
  816. fTerminateWithSync := False;
  817. fExceptionWithSync := False;
  818. fOwnedParams := aOwnedParams;
  819. SetLength(fParamArray,High(aParamArray)+1);
  820. for i := Low(aParamArray) to High(aParamArray) do
  821. begin
  822. fParamArray[i].Create(aParamArray[i]);
  823. {$IFDEF FPC}
  824. fParamArray[i]._AddRef;
  825. {$ENDIF}
  826. end;
  827. fExecuteProc := aTaskProc;
  828. fEnabled := False;
  829. end;
  830. destructor TTask.Destroy;
  831. var
  832. i : Integer;
  833. begin
  834. //free passed params
  835. if fOwnedParams then
  836. begin
  837. for i := Low(fParamArray) to High(fParamArray) do
  838. begin
  839. {$IFDEF FPC}
  840. fParamArray[i]._Release;
  841. {$ENDIF}
  842. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  843. end;
  844. end;
  845. fParamArray := nil;
  846. inherited;
  847. end;
  848. procedure TTask.DoException(aException : Exception);
  849. begin
  850. fTaskStatus := TWorkTaskStatus.wtsException;
  851. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  852. else raise aException;
  853. end;
  854. procedure TTask.DoExecute;
  855. begin
  856. fTaskStatus := TWorkTaskStatus.wtsRunning;
  857. if Assigned(fExecuteProc) then fExecuteProc(Self);
  858. fTaskStatus := TWorkTaskStatus.wtsDone;
  859. end;
  860. procedure TTask.DoTerminate;
  861. begin
  862. if Assigned(fTerminateProc) then fTerminateProc(Self);
  863. end;
  864. function TTask.GetIdTask: Int64;
  865. begin
  866. Result := fIdTask;
  867. end;
  868. procedure TTask.SetIdTask(Value : Int64);
  869. begin
  870. fIdTask := Value;
  871. end;
  872. function TTask.GetNumWorker: Integer;
  873. begin
  874. Result := fNumWorker;
  875. end;
  876. function TTask.GetParam(aIndex: Integer): TFlexValue;
  877. begin
  878. Result := fParamArray[aIndex];
  879. end;
  880. function TTask.IsEnabled: Boolean;
  881. begin
  882. Result := fEnabled;
  883. end;
  884. procedure TTask.SetNumWorker(Value: Integer);
  885. begin
  886. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  887. fNumWorker := Value;
  888. end;
  889. function TTask.TaskStatus: TWorkTaskStatus;
  890. begin
  891. Result := fTaskStatus;
  892. end;
  893. { TWorkTask }
  894. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  895. begin
  896. fExceptProc := aTaskProc;
  897. Result := Self;
  898. end;
  899. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  900. begin
  901. fTerminateProc := aTaskProc;
  902. Result := Self;
  903. end;
  904. procedure TWorkTask.Run;
  905. begin
  906. fEnabled := True;
  907. end;
  908. { TBackgroundTasks }
  909. procedure TBackgroundTasks.CancelAll;
  910. begin
  911. fTaskQueue.Clear;
  912. end;
  913. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  914. begin
  915. fMaxQueue := aMaxQueue;
  916. fConcurrentWorkers := aConcurrentWorkers;
  917. fInsertTimeout := INFINITE;
  918. fExtractTimeout := 5000;
  919. fNumPushedTasks := 0;
  920. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  921. end;
  922. destructor TBackgroundTasks.Destroy;
  923. begin
  924. if Assigned(fWorkerPool) then fWorkerPool.Free;
  925. if Assigned(fTaskQueue) then fTaskQueue.Free;
  926. inherited;
  927. end;
  928. function TBackgroundTasks.GetTaskQueue: Cardinal;
  929. begin
  930. Result := fTaskQueue.QueueSize;
  931. end;
  932. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  933. begin
  934. Result := AddTask([],False,aTaskProc);
  935. end;
  936. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  937. var
  938. worktask : IWorkTask;
  939. begin
  940. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  941. Inc(fNumPushedTasks);
  942. worktask.SetIdTask(fNumPushedTasks);
  943. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  944. begin
  945. Result := worktask;
  946. end;
  947. end;
  948. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  949. begin
  950. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  951. TTask(Result).ExecuteWithSync := True;
  952. end;
  953. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  954. begin
  955. Result := AddTask_Sync([],False,aTaskProc);
  956. end;
  957. procedure TBackgroundTasks.Start;
  958. var
  959. i : Integer;
  960. worker : TWorker;
  961. begin
  962. //create workers
  963. if fWorkerPool <> nil then fWorkerPool.Free;
  964. fWorkerPool := TWorkerPool.Create(True);
  965. for i := 1 to fConcurrentWorkers do
  966. begin
  967. worker := TWorker.Create(i,fTaskQueue);
  968. fWorkerPool.Add(worker);
  969. worker.Start;
  970. end;
  971. end;
  972. { TWorker }
  973. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  974. begin
  975. inherited Create(True);
  976. fNumWorker := aNumWorker;
  977. fStatus := TWorkerStatus.wsSuspended;
  978. fTaskQueue := aTaskQueue;
  979. FreeOnTerminate := False;
  980. end;
  981. procedure TWorker.ExecuteTask;
  982. begin
  983. fCurrentTask.DoExecute;
  984. end;
  985. procedure TWorker.TerminateTask;
  986. begin
  987. fCurrentTask.DoTerminate;
  988. end;
  989. procedure TWorker.Execute;
  990. begin
  991. fStatus := TWorkerStatus.wsIdle;
  992. while (not Terminated) and (fTaskQueue.QueueSize > 0) do
  993. begin
  994. fCurrentTask := fTaskQueue.PopItem;
  995. if fCurrentTask <> nil then
  996. try
  997. fStatus := TWorkerStatus.wsWorking;
  998. try
  999. fCurrentIdTask := fCurrentTask.GetIdTask;
  1000. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1001. else fCurrentTask.DoExecute;
  1002. except
  1003. on E : Exception do
  1004. begin
  1005. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1006. else raise Exception.Create(e.Message);
  1007. end;
  1008. end;
  1009. finally
  1010. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1011. else fCurrentTask.DoTerminate;
  1012. fStatus := TWorkerStatus.wsIdle;
  1013. end;
  1014. end;
  1015. fStatus := TWorkerStatus.wsSuspended
  1016. end;
  1017. { TScheduledTasks }
  1018. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1019. begin
  1020. Result := AddTask(aTaskName,[],False,aTaskProc);
  1021. end;
  1022. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1023. var
  1024. scheduletask : TScheduledTask;
  1025. begin
  1026. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1027. scheduletask.Name := aTaskName;
  1028. Inc(fNumPushedTasks);
  1029. scheduletask.SetIdTask(fNumPushedTasks);
  1030. fTaskList.Add(scheduletask);
  1031. Result := scheduletask;
  1032. end;
  1033. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1034. begin
  1035. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1036. TTask(Result).ExecuteWithSync := True;
  1037. end;
  1038. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1039. begin
  1040. Result := AddTask_Sync(aTaskName,aTaskProc);
  1041. end;
  1042. constructor TScheduledTasks.Create;
  1043. begin
  1044. fNumPushedTasks := 0;
  1045. fIsStarted := False;
  1046. fTaskList := TScheduledTaskList.Create;
  1047. end;
  1048. destructor TScheduledTasks.Destroy;
  1049. begin
  1050. if Assigned(fScheduler) then
  1051. begin
  1052. fScheduler.Terminate;
  1053. fScheduler.WaitFor;
  1054. fScheduler.Free;
  1055. end;
  1056. if Assigned(fTaskList) then fTaskList.Free;
  1057. inherited;
  1058. end;
  1059. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1060. begin
  1061. Result := fScheduler.Get(aIdTask);
  1062. end;
  1063. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1064. begin
  1065. Result := fScheduler.Get(aTaskName);
  1066. end;
  1067. procedure TScheduledTasks.Start;
  1068. begin
  1069. if fIsStarted then Exit;
  1070. if not Assigned(fScheduler) then
  1071. begin
  1072. fScheduler := TScheduler.Create(fTaskList);
  1073. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1074. end;
  1075. fScheduler.Start;
  1076. fIsStarted := True;
  1077. end;
  1078. procedure TScheduledTasks.Stop;
  1079. begin
  1080. if Assigned(fScheduler) then fScheduler.Terminate;
  1081. fIsStarted := False;
  1082. end;
  1083. { TScheduledTask }
  1084. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1085. begin
  1086. Result := Self;
  1087. ClearSchedule;
  1088. fScheduleMode := TScheduleMode.smRunOnce;
  1089. fStartDate := aStartDate;
  1090. fNextExecution := aStartDate;
  1091. end;
  1092. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1093. begin
  1094. if fStartDate = 0.0 then ClearSchedule;
  1095. fScheduleMode := TScheduleMode.smRepeatMode;
  1096. fTimeMeasure := aTimeMeasure;
  1097. fTimeInterval := aInterval;
  1098. if fStartDate = 0.0 then fStartDate := Now();
  1099. fNextExecution := fStartDate;
  1100. fEnabled := True;
  1101. end;
  1102. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1103. begin
  1104. if fStartDate = 0.0 then ClearSchedule;
  1105. fScheduleMode := TScheduleMode.smRepeatMode;
  1106. fTimeMeasure := aTimeMeasure;
  1107. fTimeInterval := aInterval;
  1108. if fStartDate = 0.0 then fStartDate := Now();
  1109. fExpirationDate := aEndTime;
  1110. fNextExecution := fStartDate;
  1111. fEnabled := True;
  1112. end;
  1113. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1114. begin
  1115. if fStartDate = 0.0 then ClearSchedule;
  1116. fScheduleMode := TScheduleMode.smRepeatMode;
  1117. fTimeMeasure := aTimeMeasure;
  1118. fTimeInterval := aInterval;
  1119. if fStartDate = 0.0 then fStartDate := Now();
  1120. fExpirationTimes := aRepeatTimes;
  1121. fNextExecution := fStartDate;
  1122. fEnabled := True;
  1123. end;
  1124. procedure TScheduledTask.RunOnce;
  1125. begin
  1126. fScheduleMode := TScheduleMode.smRunOnce;
  1127. if fStartDate = 0.0 then fStartDate := Now();
  1128. fNextExecution := fStartDate;
  1129. fEnabled := True;
  1130. end;
  1131. procedure TScheduledTask.Cancel;
  1132. begin
  1133. fFinished := True;
  1134. end;
  1135. function TScheduledTask.CheckSchedule: Boolean;
  1136. begin
  1137. Result := False;
  1138. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1139. if fScheduleMode = TScheduleMode.smRunOnce then
  1140. begin
  1141. //if start date reached
  1142. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1143. begin
  1144. fLastExecution := Now();
  1145. Inc(fExecutionTimes);
  1146. fFinished := True;
  1147. Result := True;
  1148. end;
  1149. end
  1150. else
  1151. begin
  1152. //if next execution reached
  1153. if Now() >= fNextExecution then
  1154. begin
  1155. //check expiration limits
  1156. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1157. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1158. begin
  1159. fFinished := True;
  1160. Exit;
  1161. end;
  1162. //calculate next execution
  1163. case fTimeMeasure of
  1164. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1165. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1166. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1167. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1168. end;
  1169. fLastExecution := Now();
  1170. Inc(fExecutionTimes);
  1171. Result := True;
  1172. end;
  1173. end;
  1174. end;
  1175. procedure TScheduledTask.ClearSchedule;
  1176. begin
  1177. inherited;
  1178. fFinished := False;
  1179. fStartDate := 0.0;
  1180. fLastExecution := 0.0;
  1181. fNextExecution := 0.0;
  1182. fExpirationDate := 0.0;
  1183. fScheduleMode := TScheduleMode.smRunOnce;
  1184. fTimeMeasure := TTimeMeasure.tmSeconds;
  1185. fTimeInterval := 0;
  1186. end;
  1187. procedure TScheduledTask.DoExpire;
  1188. begin
  1189. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1190. fEnabled := False;
  1191. end;
  1192. function TScheduledTask.GetTaskName: string;
  1193. begin
  1194. Result := fName;
  1195. end;
  1196. function TScheduledTask.IsFinished: Boolean;
  1197. begin
  1198. Result := fFinished;
  1199. end;
  1200. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1201. begin
  1202. fExceptProc := aTaskProc;
  1203. Result := Self;
  1204. end;
  1205. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1206. begin
  1207. Result := OnException(aTaskProc);
  1208. TTask(Result).ExceptionWithSync := True;
  1209. end;
  1210. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1211. begin
  1212. fExpiredProc := aTaskProc;
  1213. Result := Self;
  1214. end;
  1215. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1216. begin
  1217. Result := OnExpired(aTaskProc);
  1218. TScheduledTask(Result).ExpireWithSync := True;
  1219. end;
  1220. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1221. begin
  1222. fTerminateProc := aTaskProc;
  1223. Result := Self;
  1224. end;
  1225. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1226. begin
  1227. Result := OnTerminated(aTaskProc);
  1228. TTask(Result).TerminateWithSync := True;
  1229. end;
  1230. { TScheduledWorker }
  1231. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1232. begin
  1233. inherited Create(aNumWorker,nil);
  1234. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1235. FreeOnTerminate := True;
  1236. fCurrentTask := aScheduledTask;
  1237. end;
  1238. procedure TScheduledWorker.Execute;
  1239. begin
  1240. fStatus := TWorkerStatus.wsIdle;
  1241. if Assigned(fCurrentTask) then
  1242. begin
  1243. try
  1244. fStatus := TWorkerStatus.wsWorking;
  1245. try
  1246. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1247. else fCurrentTask.DoExecute;
  1248. fStatus := TWorkerStatus.wsIdle;
  1249. except
  1250. on E : Exception do
  1251. begin
  1252. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1253. else raise Exception.Create(e.Message);
  1254. end;
  1255. end;
  1256. finally
  1257. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1258. else fCurrentTask.DoTerminate;
  1259. //check if expired
  1260. if (fCurrentTask as IScheduledTask).IsFinished then
  1261. begin
  1262. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1263. else (fCurrentTask as IScheduledTask).DoExpire;
  1264. end;
  1265. end;
  1266. end;
  1267. fCurrentTask := nil;
  1268. fStatus := TWorkerStatus.wsSuspended;
  1269. end;
  1270. procedure TScheduledWorker.ExpireTask;
  1271. begin
  1272. (fCurrentTask as IScheduledTask).DoExpire;
  1273. end;
  1274. { TScheduler }
  1275. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1276. begin
  1277. inherited Create(True);
  1278. FreeOnTerminate := False;
  1279. fListLock := TCriticalSection.Create;
  1280. fRemoveTaskAfterExpiration := False;
  1281. fTaskList := aTaskList;
  1282. {$IFDEF FPC}
  1283. fCondVar := TSimpleEvent.Create;
  1284. {$ELSE}
  1285. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1286. {$ENDIF}
  1287. end;
  1288. destructor TScheduler.Destroy;
  1289. begin
  1290. fCondVar.SetEvent;
  1291. fCondVar.Free;
  1292. fTaskList := nil;
  1293. fListLock.Free;
  1294. inherited;
  1295. end;
  1296. procedure TScheduler.Execute;
  1297. var
  1298. task : IScheduledTask;
  1299. worker : TScheduledWorker;
  1300. numworker : Int64;
  1301. begin
  1302. numworker := 0;
  1303. while not Terminated do
  1304. begin
  1305. fListLock.Enter;
  1306. try
  1307. for task in fTaskList do
  1308. begin
  1309. if (task.IsEnabled) and (not task.IsFinished) then
  1310. begin
  1311. if task.CheckSchedule then
  1312. begin
  1313. Inc(numworker);
  1314. worker := TScheduledWorker.Create(numworker,task);
  1315. worker.Start;
  1316. end;
  1317. end
  1318. else
  1319. begin
  1320. if task.IsEnabled then
  1321. begin
  1322. //if TScheduledTask(task).ExpireWithSync then Synchronize(ExpireTask)
  1323. // else task.DoExpire;
  1324. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1325. end;
  1326. end;
  1327. end;
  1328. task := nil;
  1329. finally
  1330. fListLock.Leave;
  1331. end;
  1332. fCondVar.WaitFor(250);
  1333. end;
  1334. end;
  1335. procedure TScheduler.ExpireTask;
  1336. begin
  1337. end;
  1338. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1339. begin
  1340. Result := fTaskList.Add(aTask);
  1341. end;
  1342. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1343. var
  1344. task : IScheduledTask;
  1345. begin
  1346. fListLock.Enter;
  1347. try
  1348. for task in fTaskList do
  1349. begin
  1350. if task.IdTask = aIdTask then Exit(task);
  1351. end;
  1352. finally
  1353. fListLock.Leave;
  1354. end;
  1355. end;
  1356. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1357. var
  1358. task : IScheduledTask;
  1359. begin
  1360. fListLock.Enter;
  1361. try
  1362. for task in fTaskList do
  1363. begin
  1364. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1365. end;
  1366. finally
  1367. fListLock.Leave;
  1368. end;
  1369. end;
  1370. { TAdvThread }
  1371. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1372. begin
  1373. inherited Create(True);
  1374. FreeOnTerminate := True;
  1375. fExecuteWithSync := aSynchronize;
  1376. fExecuteProc := aProc;
  1377. end;
  1378. procedure TAdvThread.DoExecute;
  1379. begin
  1380. if Assigned(fExecuteProc) then fExecuteProc;
  1381. end;
  1382. procedure TAdvThread.CallToTerminate;
  1383. begin
  1384. if Assigned(fTerminateProc) then fTerminateProc;
  1385. end;
  1386. procedure TAdvThread.DoTerminate;
  1387. begin
  1388. if fTerminateWithSync then Synchronize(CallToTerminate)
  1389. else CallToTerminate;
  1390. end;
  1391. procedure TAdvThread.Execute;
  1392. begin
  1393. if fExecuteWithSync then Synchronize(DoExecute)
  1394. else DoExecute;
  1395. end;
  1396. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1397. begin
  1398. fTerminateWithSync := aSynchronize;
  1399. fTerminateProc := aProc;
  1400. end;
  1401. end.