Quick.Threads.pas 36 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.4
  7. Created : 09/03/2018
  8. Modified : 14/01/2019
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. //rtti,
  27. Types,
  28. SysUtils,
  29. DateUtils,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. {$IFNDEF FPC}
  33. System.RTLConsts,
  34. System.Generics.Collections,
  35. System.SyncObjs;
  36. {$ELSE}
  37. RtlConsts,
  38. Generics.Collections,
  39. syncobjs;
  40. {$ENDIF}
  41. type
  42. TThreadedQueueCS<T> = class
  43. private
  44. FQueue: array of T;
  45. FQueueSize, FQueueOffset: Integer;
  46. FQueueLock: TCriticalSection;
  47. {$IFDEF FPC}
  48. FQueueCondVar : TEventObject;
  49. {$ELSE}
  50. FQueueCondVar: TConditionVariableCS;
  51. {$ENDIF}
  52. FShutDown: Boolean;
  53. FPushTimeout, FPopTimeout: Cardinal;
  54. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  55. public
  56. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  57. destructor Destroy; override;
  58. procedure Grow(ADelta: Integer);
  59. function PushItem(const AItem: T): TWaitResult; overload;
  60. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  61. function PopItem: T; overload;
  62. function PopItem(var AQueueSize: Integer): T; overload;
  63. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  64. function PopItem(var AItem: T): TWaitResult; overload;
  65. procedure DoShutDown;
  66. procedure Clear;
  67. property QueueSize: Integer read FQueueSize;
  68. property ShutDown: Boolean read FShutDown;
  69. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  70. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  71. end;
  72. TThreadedQueueList<T> = class
  73. private
  74. fQueue : TQueue<T>;
  75. fQueueSize : Integer;
  76. fQueueLock : TCriticalSection;
  77. {$IFDEF FPC}
  78. FQueueCondVar : TSimpleEvent;
  79. {$ELSE}
  80. FQueueCondVar: TConditionVariableCS;
  81. {$ENDIF}
  82. fShutDown : Boolean;
  83. fPushTimeout : Cardinal;
  84. fPopTimeout : Cardinal;
  85. fTotalItemsPushed : Cardinal;
  86. fTotalItemsPopped : Cardinal;
  87. public
  88. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  89. destructor Destroy; override;
  90. procedure Grow(ADelta: Integer);
  91. function PushItem(const AItem: T): TWaitResult; overload;
  92. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  93. function PopItem: T; overload;
  94. function PopItem(var AQueueSize: Integer): T; overload;
  95. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  96. function PopItem(var AItem: T): TWaitResult; overload;
  97. procedure DoShutDown;
  98. property QueueSize: Integer read FQueueSize;
  99. property ShutDown: Boolean read FShutDown;
  100. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  101. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  102. end;
  103. {$IFNDEF FPC}
  104. TThreadObjectList<T: class> = class(TList<T>)
  105. private
  106. fList: TObjectList<T>;
  107. fLock: TObject;
  108. fDuplicates: TDuplicates;
  109. function GetItem(aIndex : Integer) : T;
  110. procedure SetItem(aIndex : Integer; aValue : T);
  111. public
  112. constructor Create(OwnedObjects : Boolean);
  113. destructor Destroy; override;
  114. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  115. procedure Add(const Item: T);
  116. procedure Clear;
  117. function LockList: TObjectList<T>;
  118. procedure Remove(const Item: T); inline;
  119. procedure RemoveItem(const Item: T; Direction: TDirection);
  120. procedure UnlockList; inline;
  121. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  122. end;
  123. {$ENDIF}
  124. {$IFDEF FPC}
  125. TProc = procedure of object;
  126. {$ENDIF}
  127. IAnonymousThread = interface
  128. procedure Start;
  129. function OnTerminate(aProc : TProc) : IAnonymousThread;
  130. end;
  131. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  132. private
  133. fThread : TThread;
  134. fTerminateProc : TProc;
  135. constructor Create(aProc : TProc);
  136. procedure NotifyTerminate(Sender : TObject);
  137. public
  138. class function Execute(aProc : TProc) : IAnonymousThread;
  139. procedure Start;
  140. function OnTerminate(aProc : TProc) : IAnonymousThread;
  141. end;
  142. TParamArray = array of TFlexValue;
  143. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  144. TScheduleMode = (smRunOnce, smRepeatMode);
  145. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
  146. ITask = interface
  147. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  148. function GetParam(aIndex : Integer) : TFlexValue;
  149. function TaskStatus : TWorkTaskStatus;
  150. function GetNumWorker : Integer;
  151. procedure SetNumWorker(Value : Integer);
  152. function GetIdTask : Int64;
  153. procedure SetIdTask(Value : Int64);
  154. procedure DoExecute;
  155. procedure DoException(aException : Exception);
  156. procedure DoTerminate;
  157. property Param[index : Integer] : TFlexValue read GetParam;
  158. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  159. property IdTask : Int64 read GetIdTask;
  160. function IsEnabled : Boolean;
  161. end;
  162. {$IFNDEF FPC}
  163. TTaskProc = reference to procedure(task : ITask);
  164. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  165. {$ELSE}
  166. TTaskProc = procedure(task : ITask) of object;
  167. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  168. {$ENDIF}
  169. IWorkTask = interface(ITask)
  170. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  171. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  172. procedure Run;
  173. end;
  174. IScheduledTask = interface(ITask)
  175. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  176. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  177. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  178. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  179. function CheckSchedule : Boolean;
  180. procedure DoExpire;
  181. function GetTaskName : string;
  182. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  183. procedure RunOnce;
  184. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  185. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  186. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  187. function IsFinished : Boolean;
  188. procedure Cancel;
  189. property Name : string read GetTaskName;
  190. end;
  191. TTask = class(TInterfacedObject,ITask)
  192. private
  193. fIdTask : Int64;
  194. fNumWorker : Integer;
  195. fParamArray : TParamArray;
  196. fExecuteProc : TTaskProc;
  197. fExceptProc : TTaskExceptionProc;
  198. fTerminateProc : TTaskProc;
  199. fExpiredProc : TTaskProc;
  200. fTaskStatus : TWorkTaskStatus;
  201. fOwnedParams : Boolean;
  202. fEnabled : Boolean;
  203. function GetParam(aIndex : Integer) : TFlexValue;
  204. procedure DoExecute;
  205. procedure DoException(aException : Exception);
  206. procedure DoTerminate;
  207. function GetNumWorker : Integer;
  208. procedure SetNumWorker(Value : Integer);
  209. function GetIdTask : Int64;
  210. procedure SetIdTask(Value : Int64);
  211. public
  212. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  213. destructor Destroy; override;
  214. property IdTask : Int64 read GetIdTask;
  215. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  216. function IsEnabled : Boolean;
  217. function TaskStatus : TWorkTaskStatus;
  218. end;
  219. TWorkTask = class(TTask,IWorkTask)
  220. public
  221. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  222. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  223. procedure Run; virtual;
  224. end;
  225. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  226. TScheduledTask = class(TTask,IScheduledTask)
  227. private
  228. fName : string;
  229. fExecutionTimes : Integer;
  230. fScheduleMode : TScheduleMode;
  231. fTimeInterval : Integer;
  232. fTimeMeasure : TTimeMeasure;
  233. fStartDate : TDateTime;
  234. fLastExecution : TDateTime;
  235. fNextExecution : TDateTime;
  236. fExpirationDate : TDateTime;
  237. fExpirationTimes : Integer;
  238. fFinished : Boolean;
  239. procedure ClearSchedule;
  240. function CheckSchedule : Boolean;
  241. procedure DoExpire;
  242. function GetTaskName : string;
  243. public
  244. property Name : string read fName write fName;
  245. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  246. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  247. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  248. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  249. procedure RunOnce;
  250. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  251. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  252. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  253. function IsFinished : Boolean;
  254. procedure Cancel;
  255. end;
  256. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  257. TWorker = class(TThread)
  258. private
  259. fNumWorker : Integer;
  260. fCurrentIdTask : Integer;
  261. fCurrentTask : IWorkTask;
  262. fStatus : TWorkerStatus;
  263. fTaskQueue : TTaskQueue;
  264. public
  265. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  266. property NumWorker : Integer read fNumWorker;
  267. property Status : TWorkerStatus read fStatus;
  268. procedure Execute; override;
  269. end;
  270. TScheduledWorker = class(TWorker)
  271. private
  272. fTask : IScheduledTask;
  273. public
  274. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  275. procedure Execute; override;
  276. end;
  277. TWorkerPool = TObjectList<TWorker>;
  278. TBackgroundTasks = class
  279. private
  280. fMaxQueue : Integer;
  281. fWorkerPool : TWorkerPool;
  282. fConcurrentWorkers : Integer;
  283. fInsertTimeout : Cardinal;
  284. fExtractTimeout : Cardinal;
  285. fTaskQueue : TTaskQueue;
  286. fNumPushedTasks : Int64;
  287. function GetTaskQueue : Cardinal;
  288. public
  289. constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  290. destructor Destroy; override;
  291. property MaxQueue : Integer read fMaxQueue;
  292. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  293. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  294. property TaskQueued : Cardinal read GetTaskQueue;
  295. property NumPushedTasks : Int64 read fNumPushedTasks;
  296. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  297. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  298. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  299. procedure Start;
  300. procedure CancelAll;
  301. end;
  302. TScheduledTaskList = TList<IScheduledTask>;
  303. TScheduler = class(TThread)
  304. private
  305. fListLock : TCriticalSection;
  306. fCondVar : TSimpleEvent;
  307. fTaskList : TScheduledTaskList;
  308. fRemoveTaskAfterExpiration : Boolean;
  309. public
  310. constructor Create(aTaskList : TScheduledTaskList);
  311. destructor Destroy; override;
  312. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  313. procedure Execute; override;
  314. function Add(aTask : TScheduledTask) : Integer;
  315. function Get(aIdTask : Int64) : IScheduledTask; overload;
  316. function Get(const aTaskName : string) : IScheduledTask; overload;
  317. end;
  318. TScheduledTasks = class
  319. private
  320. fTaskList : TScheduledTaskList;
  321. fScheduler : TScheduler;
  322. fNumPushedTasks : Int64;
  323. fRemoveTaskAfterExpiration : Boolean;
  324. fIsStarted : Boolean;
  325. public
  326. constructor Create;
  327. destructor Destroy; override;
  328. property NumPushedTasks : Int64 read fNumPushedTasks;
  329. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  330. property IsStarted : Boolean read fIsStarted;
  331. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  332. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  333. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  334. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  335. procedure Start;
  336. procedure Stop;
  337. end;
  338. implementation
  339. { TThreadedQueueCS<T> }
  340. procedure TThreadedQueueCS<T>.Clear;
  341. var
  342. obj : T;
  343. begin
  344. FQueueLock.Enter;
  345. try
  346. for obj in FQueue do
  347. begin
  348. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  349. end;
  350. SetLength(FQueue,0);
  351. finally
  352. FQueueLock.Leave;
  353. end;
  354. {$IFDEF FPC}
  355. FQueueCondVar.SetEvent;
  356. {$ELSE}
  357. FQueueCondVar.ReleaseAll;
  358. {$ENDIF}
  359. end;
  360. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  361. begin
  362. inherited Create;
  363. SetLength(FQueue, AQueueDepth);
  364. FQueueLock := TCriticalSection.Create;
  365. {$IFDEF FPC}
  366. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  367. {$ELSE}
  368. FQueueCondVar := TConditionVariableCS.Create;
  369. {$ENDIF}
  370. FPushTimeout := PushTimeout;
  371. FPopTimeout := PopTimeout;
  372. end;
  373. destructor TThreadedQueueCS<T>.Destroy;
  374. begin
  375. DoShutDown;
  376. FQueueLock.Free;
  377. FQueueCondVar.Free;
  378. inherited;
  379. end;
  380. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  381. begin
  382. FQueueLock.Enter;
  383. try
  384. SetLength(FQueue, Length(FQueue) + ADelta);
  385. finally
  386. FQueueLock.Leave;
  387. end;
  388. {$IFDEF FPC}
  389. FQueueCondVar.SetEvent;
  390. {$ELSE}
  391. FQueueCondVar.ReleaseAll;
  392. {$ENDIF}
  393. end;
  394. function TThreadedQueueCS<T>.PopItem: T;
  395. var
  396. LQueueSize: Integer;
  397. begin
  398. PopItem(LQueueSize, Result);
  399. end;
  400. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  401. begin
  402. AItem := Default(T);
  403. FQueueLock.Enter;
  404. try
  405. Result := wrSignaled;
  406. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  407. begin
  408. {$IFDEF FPC}
  409. Result := FQueueCondVar.WaitFor(FPopTimeout);
  410. {$ELSE}
  411. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  412. {$ENDIF}
  413. end;
  414. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  415. AItem := FQueue[FQueueOffset];
  416. FQueue[FQueueOffset] := Default(T);
  417. if FQueueSize = Length(FQueue) then
  418. begin
  419. {$IFDEF FPC}
  420. FQueueCondVar.SetEvent;
  421. {$ELSE}
  422. FQueueCondVar.ReleaseAll;
  423. {$ENDIF}
  424. end;
  425. Dec(FQueueSize);
  426. Inc(FQueueOffset);
  427. Inc(FTotalItemsPopped);
  428. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  429. finally
  430. AQueueSize := FQueueSize;
  431. FQueueLock.Leave;
  432. end;
  433. end;
  434. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  435. var
  436. LQueueSize: Integer;
  437. begin
  438. Result := PopItem(LQueueSize, AItem);
  439. end;
  440. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  441. begin
  442. PopItem(AQueueSize, Result);
  443. end;
  444. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  445. var
  446. LQueueSize: Integer;
  447. begin
  448. Result := PushItem(AItem, LQueueSize);
  449. end;
  450. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  451. begin
  452. FQueueLock.Enter;
  453. try
  454. Result := wrSignaled;
  455. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  456. begin
  457. {$IFDEF FPC}
  458. Result := FQueueCondVar.WaitFor(FPushTimeout);
  459. {$ELSE}
  460. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  461. {$ENDIF}
  462. end;
  463. if FShutDown or (Result <> wrSignaled) then Exit;
  464. if FQueueSize = 0 then
  465. begin
  466. {$IFDEF FPC}
  467. FQueueCondVar.SetEvent;
  468. {$ELSE}
  469. FQueueCondVar.ReleaseAll;
  470. {$ENDIF}
  471. end;
  472. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  473. Inc(FQueueSize);
  474. Inc(FTotalItemsPushed);
  475. finally
  476. AQueueSize := FQueueSize;
  477. FQueueLock.Leave;
  478. end;
  479. end;
  480. procedure TThreadedQueueCS<T>.DoShutDown;
  481. begin
  482. FShutDown := True;
  483. {$IFDEF FPC}
  484. FQueueCondVar.SetEvent;
  485. {$ELSE}
  486. FQueueCondVar.ReleaseAll;
  487. {$ENDIF}
  488. end;
  489. { TThreadedQueueList<T> }
  490. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  491. begin
  492. inherited Create;
  493. fQueue := TQueue<T>.Create;
  494. fQueue.Capacity := AQueueDepth;
  495. fQueueSize := 0;
  496. fQueueLock := TCriticalSection.Create;
  497. {$IFDEF FPC}
  498. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  499. {$ELSE}
  500. fQueueCondVar := TConditionVariableCS.Create;
  501. {$ENDIF}
  502. fPushTimeout := PushTimeout;
  503. fPopTimeout := PopTimeout;
  504. end;
  505. destructor TThreadedQueueList<T>.Destroy;
  506. begin
  507. DoShutDown;
  508. fQueueLock.Free;
  509. fQueueCondVar.Free;
  510. fQueue.Free;
  511. inherited;
  512. end;
  513. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  514. begin
  515. fQueueLock.Enter;
  516. try
  517. fQueue.Capacity := fQueue.Capacity + ADelta;
  518. finally
  519. fQueueLock.Leave;
  520. end;
  521. {$IFDEF FPC}
  522. FQueueCondVar.SetEvent;
  523. {$ELSE}
  524. FQueueCondVar.ReleaseAll;
  525. {$ENDIF}
  526. end;
  527. function TThreadedQueueList<T>.PopItem: T;
  528. var
  529. LQueueSize: Integer;
  530. begin
  531. PopItem(LQueueSize, Result);
  532. end;
  533. {$IFDEF FPC}
  534. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  535. //var
  536. //crono : TChronometer;
  537. begin
  538. AItem := Default(T);
  539. //crono := TChronometer.Create(False);
  540. try
  541. Result := wrSignaled;
  542. //writeln('popitem');
  543. //crono.Start;
  544. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  545. begin
  546. //crono.Start;
  547. Result := FQueueCondVar.WaitFor(FPopTimeout);
  548. //crono.Stop;
  549. //writeln('in: ' + crono.ElapsedTime);
  550. //if result = twaitresult.wrError then result := twaitresult.wrError;
  551. end;
  552. //crono.Stop;
  553. //writeln('out: ' + crono.ElapsedTime);
  554. fQueueLock.Enter;
  555. try
  556. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  557. AItem := fQueue.Extract;
  558. Dec(FQueueSize);
  559. Inc(fTotalItemsPopped);
  560. finally
  561. fQueueLock.Leave;
  562. end;
  563. finally
  564. AQueueSize := fQueueSize;
  565. end;
  566. end;
  567. {$ELSE}
  568. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  569. begin
  570. AItem := Default(T);
  571. fQueueLock.Enter;
  572. try
  573. Result := wrSignaled;
  574. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  575. begin
  576. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  577. end;
  578. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  579. AItem := fQueue.Extract;
  580. if fQueueSize = fQueue.Count then
  581. begin
  582. FQueueCondVar.ReleaseAll;
  583. end;
  584. Dec(FQueueSize);
  585. Inc(fTotalItemsPopped);
  586. finally
  587. AQueueSize := fQueueSize;
  588. fQueueLock.Leave;
  589. end;
  590. end;
  591. {$ENDIF}
  592. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  593. var
  594. LQueueSize: Integer;
  595. begin
  596. Result := PopItem(LQueueSize, AItem);
  597. end;
  598. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  599. begin
  600. PopItem(AQueueSize, Result);
  601. end;
  602. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  603. var
  604. LQueueSize: Integer;
  605. begin
  606. Result := PushItem(AItem, LQueueSize);
  607. end;
  608. {$IFDEF FPC}
  609. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  610. begin
  611. FQueueLock.Enter;
  612. try
  613. Result := wrSignaled;
  614. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  615. //begin
  616. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  617. //end;
  618. if fShutDown or (Result <> wrSignaled) then Exit;
  619. //if fQueueSize = 0 then
  620. //begin
  621. // FQueueCondVar.SetEvent;
  622. //end;
  623. fQueue.Enqueue(AItem);
  624. Inc(FQueueSize);
  625. Inc(fTotalItemsPushed);
  626. finally
  627. AQueueSize := fQueueSize;
  628. FQueueLock.Leave;
  629. //FQueueCondVar.SetEvent;
  630. end;
  631. end;
  632. {$ELSE}
  633. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  634. begin
  635. FQueueLock.Enter;
  636. try
  637. Result := wrSignaled;
  638. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  639. //begin
  640. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  641. //end;
  642. if fShutDown or (Result <> wrSignaled) then Exit;
  643. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  644. fQueue.Enqueue(AItem);
  645. Inc(FQueueSize);
  646. Inc(fTotalItemsPushed);
  647. finally
  648. AQueueSize := fQueueSize;
  649. FQueueLock.Leave;
  650. end;
  651. end;
  652. {$ENDIF}
  653. procedure TThreadedQueueList<T>.DoShutDown;
  654. begin
  655. fShutDown := True;
  656. {$IFDEF FPC}
  657. FQueueCondVar.SetEvent;
  658. {$ELSE}
  659. FQueueCondVar.ReleaseAll;
  660. {$ENDIF}
  661. end;
  662. {$IFNDEF FPC}
  663. { TThreadObjectList<T> }
  664. procedure TThreadObjectList<T>.Add(const Item: T);
  665. begin
  666. LockList;
  667. try
  668. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  669. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  670. finally
  671. UnlockList;
  672. end;
  673. end;
  674. procedure TThreadObjectList<T>.Clear;
  675. begin
  676. LockList;
  677. try
  678. fList.Clear;
  679. finally
  680. UnlockList;
  681. end;
  682. end;
  683. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  684. begin
  685. inherited Create;
  686. fLock := TObject.Create;
  687. fList := TObjectList<T>.Create;
  688. fDuplicates := dupIgnore;
  689. end;
  690. destructor TThreadObjectList<T>.Destroy;
  691. begin
  692. LockList;
  693. try
  694. fList.Free;
  695. inherited Destroy;
  696. finally
  697. UnlockList;
  698. fLock.Free;
  699. end;
  700. end;
  701. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  702. begin
  703. LockList;
  704. try
  705. Result := fList[aIndex];
  706. finally
  707. UnlockList;
  708. end;
  709. end;
  710. function TThreadObjectList<T>.LockList: TObjectList<T>;
  711. begin
  712. System.TMonitor.Enter(fLock);
  713. Result := fList;
  714. end;
  715. procedure TThreadObjectList<T>.Remove(const Item: T);
  716. begin
  717. RemoveItem(Item, TDirection.FromBeginning);
  718. end;
  719. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  720. begin
  721. LockList;
  722. try
  723. fList.RemoveItem(Item, Direction);
  724. finally
  725. UnlockList;
  726. end;
  727. end;
  728. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  729. begin
  730. LockList;
  731. try
  732. fList[aIndex] := aValue;
  733. finally
  734. UnlockList;
  735. end;
  736. end;
  737. procedure TThreadObjectList<T>.UnlockList;
  738. begin
  739. System.TMonitor.Exit(fLock);
  740. end;
  741. {$ENDIF}
  742. { TAnonymousThread }
  743. constructor TAnonymousThread.Create(aProc : TProc);
  744. begin
  745. {$IFNDEF FPC}
  746. fThread := TThread.CreateAnonymousThread(aProc);
  747. {$ELSE}
  748. fThread := TThread.CreateAnonymousThread(@aProc);
  749. {$ENDIF}
  750. end;
  751. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  752. begin
  753. Result := TAnonymousThread.Create(aProc);
  754. end;
  755. procedure TAnonymousThread.NotifyTerminate(Sender: TObject);
  756. begin
  757. fTerminateProc;
  758. end;
  759. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  760. begin
  761. Result := Self;
  762. fTerminateProc := aProc;
  763. fThread.OnTerminate := Self.NotifyTerminate;
  764. end;
  765. procedure TAnonymousThread.Start;
  766. begin
  767. fThread.Start;
  768. end;
  769. { TTask }
  770. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  771. var
  772. i : Integer;
  773. begin
  774. fTaskStatus := TWorkTaskStatus.wtsPending;
  775. fOwnedParams := aOwnedParams;
  776. SetLength(fParamArray,High(aParamArray)+1);
  777. for i := Low(aParamArray) to High(aParamArray) do
  778. begin
  779. fParamArray[i].Create(aParamArray[i]);
  780. {$IFDEF FPC}
  781. fParamArray[i]._AddRef;
  782. {$ENDIF}
  783. end;
  784. fExecuteProc := aTaskProc;
  785. fEnabled := False;
  786. end;
  787. destructor TTask.Destroy;
  788. var
  789. i : Integer;
  790. begin
  791. //free passed params
  792. if fOwnedParams then
  793. begin
  794. for i := Low(fParamArray) to High(fParamArray) do
  795. begin
  796. {$IFDEF FPC}
  797. fParamArray[i]._Release;
  798. {$ENDIF}
  799. if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
  800. end;
  801. end;
  802. fParamArray := nil;
  803. inherited;
  804. end;
  805. procedure TTask.DoException(aException : Exception);
  806. begin
  807. fTaskStatus := TWorkTaskStatus.wtsException;
  808. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  809. else raise aException;
  810. end;
  811. procedure TTask.DoExecute;
  812. begin
  813. fTaskStatus := TWorkTaskStatus.wtsRunning;
  814. if Assigned(fExecuteProc) then fExecuteProc(Self);
  815. fTaskStatus := TWorkTaskStatus.wtsDone;
  816. end;
  817. procedure TTask.DoTerminate;
  818. begin
  819. if Assigned(fTerminateProc) then fTerminateProc(Self);
  820. end;
  821. function TTask.GetIdTask: Int64;
  822. begin
  823. Result := fIdTask;
  824. end;
  825. procedure TTask.SetIdTask(Value : Int64);
  826. begin
  827. fIdTask := Value;
  828. end;
  829. function TTask.GetNumWorker: Integer;
  830. begin
  831. Result := fNumWorker;
  832. end;
  833. function TTask.GetParam(aIndex: Integer): TFlexValue;
  834. begin
  835. Result := fParamArray[aIndex];
  836. end;
  837. function TTask.IsEnabled: Boolean;
  838. begin
  839. Result := fEnabled;
  840. end;
  841. procedure TTask.SetNumWorker(Value: Integer);
  842. begin
  843. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  844. fNumWorker := Value;
  845. end;
  846. function TTask.TaskStatus: TWorkTaskStatus;
  847. begin
  848. Result := fTaskStatus;
  849. end;
  850. { TWorkTask }
  851. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  852. begin
  853. fExceptProc := aTaskProc;
  854. Result := Self;
  855. end;
  856. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  857. begin
  858. fTerminateProc := aTaskProc;
  859. Result := Self;
  860. end;
  861. procedure TWorkTask.Run;
  862. begin
  863. fEnabled := True;
  864. end;
  865. { TBackgroundTasks }
  866. procedure TBackgroundTasks.CancelAll;
  867. begin
  868. fTaskQueue.Clear;
  869. end;
  870. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
  871. begin
  872. fMaxQueue := aMaxQueue;
  873. fConcurrentWorkers := aConcurrentWorkers;
  874. fInsertTimeout := INFINITE;
  875. fExtractTimeout := 5000;
  876. fNumPushedTasks := 0;
  877. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  878. end;
  879. destructor TBackgroundTasks.Destroy;
  880. begin
  881. if Assigned(fWorkerPool) then fWorkerPool.Free;
  882. if Assigned(fTaskQueue) then fTaskQueue.Free;
  883. inherited;
  884. end;
  885. function TBackgroundTasks.GetTaskQueue: Cardinal;
  886. begin
  887. Result := fTaskQueue.QueueSize;
  888. end;
  889. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  890. begin
  891. Result := AddTask([],False,aTaskProc);
  892. end;
  893. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  894. var
  895. worktask : IWorkTask;
  896. begin
  897. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  898. Inc(fNumPushedTasks);
  899. worktask.SetIdTask(fNumPushedTasks);
  900. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  901. begin
  902. Result := worktask;
  903. end;
  904. end;
  905. procedure TBackgroundTasks.Start;
  906. var
  907. i : Integer;
  908. worker : TWorker;
  909. begin
  910. //create workers
  911. if fWorkerPool <> nil then fWorkerPool.Free;
  912. fWorkerPool := TWorkerPool.Create(True);
  913. for i := 1 to fConcurrentWorkers do
  914. begin
  915. worker := TWorker.Create(i,fTaskQueue);
  916. fWorkerPool.Add(worker);
  917. worker.Start;
  918. end;
  919. end;
  920. { TWorker }
  921. constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  922. begin
  923. inherited Create(True);
  924. fNumWorker := aNumWorker;
  925. fStatus := TWorkerStatus.wsSuspended;
  926. fTaskQueue := aTaskQueue;
  927. FreeOnTerminate := False;
  928. end;
  929. procedure TWorker.Execute;
  930. begin
  931. fStatus := TWorkerStatus.wsIdle;
  932. while (not Terminated) and (fTaskQueue.QueueSize > 0) do
  933. begin
  934. fCurrentTask := fTaskQueue.PopItem;
  935. if fCurrentTask <> nil then
  936. try
  937. fStatus := TWorkerStatus.wsWorking;
  938. try
  939. fCurrentIdTask := fCurrentTask.GetIdTask;
  940. fCurrentTask.DoExecute;
  941. except
  942. on E : Exception do
  943. begin
  944. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  945. else raise Exception.Create(e.Message);
  946. end;
  947. end;
  948. finally
  949. fCurrentTask.DoTerminate;
  950. fStatus := TWorkerStatus.wsIdle;
  951. end;
  952. end;
  953. fStatus := TWorkerStatus.wsSuspended
  954. end;
  955. { TScheduledTasks }
  956. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  957. begin
  958. Result := AddTask(aTaskName,[],False,aTaskProc);
  959. end;
  960. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  961. var
  962. scheduletask : TScheduledTask;
  963. begin
  964. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  965. scheduletask.Name := aTaskName;
  966. Inc(fNumPushedTasks);
  967. scheduletask.SetIdTask(fNumPushedTasks);
  968. fTaskList.Add(scheduletask);
  969. Result := scheduletask;
  970. end;
  971. constructor TScheduledTasks.Create;
  972. begin
  973. fNumPushedTasks := 0;
  974. fIsStarted := False;
  975. fTaskList := TScheduledTaskList.Create;
  976. end;
  977. destructor TScheduledTasks.Destroy;
  978. begin
  979. if Assigned(fScheduler) then
  980. begin
  981. fScheduler.Terminate;
  982. fScheduler.WaitFor;
  983. fScheduler.Free;
  984. end;
  985. if Assigned(fTaskList) then fTaskList.Free;
  986. inherited;
  987. end;
  988. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  989. begin
  990. Result := fScheduler.Get(aIdTask);
  991. end;
  992. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  993. begin
  994. Result := fScheduler.Get(aTaskName);
  995. end;
  996. procedure TScheduledTasks.Start;
  997. begin
  998. if fIsStarted then Exit;
  999. if not Assigned(fScheduler) then
  1000. begin
  1001. fScheduler := TScheduler.Create(fTaskList);
  1002. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1003. end;
  1004. fScheduler.Start;
  1005. fIsStarted := True;
  1006. end;
  1007. procedure TScheduledTasks.Stop;
  1008. begin
  1009. if Assigned(fScheduler) then fScheduler.Terminate;
  1010. fIsStarted := False;
  1011. end;
  1012. { TScheduledTask }
  1013. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1014. begin
  1015. Result := Self;
  1016. ClearSchedule;
  1017. fScheduleMode := TScheduleMode.smRunOnce;
  1018. fStartDate := aStartDate;
  1019. fNextExecution := aStartDate;
  1020. end;
  1021. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1022. begin
  1023. if fStartDate = 0.0 then ClearSchedule;
  1024. fScheduleMode := TScheduleMode.smRepeatMode;
  1025. fTimeMeasure := aTimeMeasure;
  1026. fTimeInterval := aInterval;
  1027. if fStartDate = 0.0 then fStartDate := Now();
  1028. fNextExecution := fStartDate;
  1029. fEnabled := True;
  1030. end;
  1031. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1032. begin
  1033. if fStartDate = 0.0 then ClearSchedule;
  1034. fScheduleMode := TScheduleMode.smRepeatMode;
  1035. fTimeMeasure := aTimeMeasure;
  1036. fTimeInterval := aInterval;
  1037. if fStartDate = 0.0 then fStartDate := Now();
  1038. fExpirationDate := aEndTime;
  1039. fNextExecution := fStartDate;
  1040. fEnabled := True;
  1041. end;
  1042. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1043. begin
  1044. if fStartDate = 0.0 then ClearSchedule;
  1045. fScheduleMode := TScheduleMode.smRepeatMode;
  1046. fTimeMeasure := aTimeMeasure;
  1047. fTimeInterval := aInterval;
  1048. if fStartDate = 0.0 then fStartDate := Now();
  1049. fExpirationTimes := aRepeatTimes;
  1050. fNextExecution := fStartDate;
  1051. fEnabled := True;
  1052. end;
  1053. procedure TScheduledTask.RunOnce;
  1054. begin
  1055. ClearSchedule;
  1056. fScheduleMode := TScheduleMode.smRunOnce;
  1057. if fStartDate = 0.0 then fStartDate := Now();
  1058. fNextExecution := fStartDate;
  1059. fEnabled := True;
  1060. end;
  1061. procedure TScheduledTask.Cancel;
  1062. begin
  1063. fFinished := True;
  1064. end;
  1065. function TScheduledTask.CheckSchedule: Boolean;
  1066. begin
  1067. Result := False;
  1068. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1069. if fScheduleMode = TScheduleMode.smRunOnce then
  1070. begin
  1071. //if start date reached
  1072. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1073. begin
  1074. fLastExecution := Now();
  1075. Inc(fExecutionTimes);
  1076. Result := True;
  1077. end;
  1078. end
  1079. else
  1080. begin
  1081. //if next execution reached
  1082. if Now() >= fNextExecution then
  1083. begin
  1084. //check expiration limits
  1085. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1086. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1087. begin
  1088. fFinished := True;
  1089. Exit;
  1090. end;
  1091. //calculate next execution
  1092. case fTimeMeasure of
  1093. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1094. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1095. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1096. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1097. end;
  1098. fLastExecution := Now();
  1099. Inc(fExecutionTimes);
  1100. Result := True;
  1101. end;
  1102. end;
  1103. end;
  1104. procedure TScheduledTask.ClearSchedule;
  1105. begin
  1106. inherited;
  1107. fFinished := False;
  1108. fStartDate := 0.0;
  1109. fLastExecution := 0.0;
  1110. fNextExecution := 0.0;
  1111. fExpirationDate := 0.0;
  1112. fScheduleMode := TScheduleMode.smRunOnce;
  1113. fTimeMeasure := TTimeMeasure.tmSeconds;
  1114. fTimeInterval := 0;
  1115. end;
  1116. procedure TScheduledTask.DoExpire;
  1117. begin
  1118. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1119. fEnabled := False;
  1120. end;
  1121. function TScheduledTask.GetTaskName: string;
  1122. begin
  1123. Result := fName;
  1124. end;
  1125. function TScheduledTask.IsFinished: Boolean;
  1126. begin
  1127. Result := fFinished;
  1128. end;
  1129. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1130. begin
  1131. fExceptProc := aTaskProc;
  1132. Result := Self;
  1133. end;
  1134. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1135. begin
  1136. fExpiredProc := aTaskProc;
  1137. Result := Self;
  1138. end;
  1139. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1140. begin
  1141. fTerminateProc := aTaskProc;
  1142. Result := Self;
  1143. end;
  1144. { TScheduledWorker }
  1145. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1146. begin
  1147. inherited Create(aNumWorker,nil);
  1148. FreeOnTerminate := True;
  1149. fTask := aScheduledTask;
  1150. end;
  1151. procedure TScheduledWorker.Execute;
  1152. begin
  1153. fStatus := TWorkerStatus.wsIdle;
  1154. if Assigned(fTask) then
  1155. begin
  1156. try
  1157. fStatus := TWorkerStatus.wsWorking;
  1158. try
  1159. fTask.DoExecute;
  1160. fStatus := TWorkerStatus.wsIdle;
  1161. except
  1162. on E : Exception do
  1163. begin
  1164. if fTask <> nil then fTask.DoException(E)
  1165. else raise Exception.Create(e.Message);
  1166. end;
  1167. end;
  1168. finally
  1169. fTask.DoTerminate;
  1170. end;
  1171. end;
  1172. fTask := nil;
  1173. fStatus := TWorkerStatus.wsSuspended;
  1174. end;
  1175. { TScheduler }
  1176. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1177. begin
  1178. inherited Create(True);
  1179. FreeOnTerminate := False;
  1180. fListLock := TCriticalSection.Create;
  1181. fRemoveTaskAfterExpiration := False;
  1182. fTaskList := aTaskList;
  1183. {$IFDEF FPC}
  1184. fCondVar := TSimpleEvent.Create;
  1185. {$ELSE}
  1186. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1187. {$ENDIF}
  1188. end;
  1189. destructor TScheduler.Destroy;
  1190. begin
  1191. fCondVar.SetEvent;
  1192. fCondVar.Free;
  1193. fTaskList := nil;
  1194. fListLock.Free;
  1195. inherited;
  1196. end;
  1197. procedure TScheduler.Execute;
  1198. var
  1199. task : IScheduledTask;
  1200. worker : TScheduledWorker;
  1201. numworker : Int64;
  1202. begin
  1203. numworker := 0;
  1204. while not Terminated do
  1205. begin
  1206. fListLock.Enter;
  1207. try
  1208. for task in fTaskList do
  1209. begin
  1210. if not task.IsFinished then
  1211. begin
  1212. if task.CheckSchedule then
  1213. begin
  1214. Inc(numworker);
  1215. worker := TScheduledWorker.Create(numworker,task);
  1216. worker.Start;
  1217. end;
  1218. end
  1219. else
  1220. begin
  1221. if task.IsEnabled then
  1222. begin
  1223. task.DoExpire;
  1224. if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
  1225. end;
  1226. end;
  1227. end;
  1228. task := nil;
  1229. finally
  1230. fListLock.Leave;
  1231. end;
  1232. fCondVar.WaitFor(250);
  1233. end;
  1234. end;
  1235. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1236. begin
  1237. Result := fTaskList.Add(aTask);
  1238. end;
  1239. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1240. var
  1241. task : IScheduledTask;
  1242. begin
  1243. fListLock.Enter;
  1244. try
  1245. for task in fTaskList do
  1246. begin
  1247. if task.IdTask = aIdTask then Exit(task);
  1248. end;
  1249. finally
  1250. fListLock.Leave;
  1251. end;
  1252. end;
  1253. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1254. var
  1255. task : IScheduledTask;
  1256. begin
  1257. fListLock.Enter;
  1258. try
  1259. for task in fTaskList do
  1260. begin
  1261. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1262. end;
  1263. finally
  1264. fListLock.Leave;
  1265. end;
  1266. end;
  1267. end.