Quick.Threads.pas 65 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2020 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 27/06/2020
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. {$IFNDEF FPC}
  209. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  210. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  211. {$ELSE}
  212. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  213. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  214. {$ENDIF}
  215. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  216. property Result : TFlexValue read GetResult write SetResult;
  217. property IdTask : Int64 read GetIdTask;
  218. function Done : Boolean;
  219. function Failed : Boolean;
  220. function NumRetries : Integer;
  221. function MaxRetries : Integer;
  222. function LastException : Exception;
  223. function CircuitBreaked : Boolean;
  224. function IsEnabled : Boolean;
  225. end;
  226. {$IFNDEF FPC}
  227. TTaskProc = reference to procedure(task : ITask);
  228. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  229. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  230. {$ELSE}
  231. TTaskProc = procedure(task : ITask) of object;
  232. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  233. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  234. {$ENDIF}
  235. IWorkTask = interface(ITask)
  236. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  237. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  238. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  239. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  240. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  241. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  242. function Retry(aMaxRetries : Integer) : IWorkTask;
  243. function RetryForever : IWorkTask;
  244. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  245. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  246. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  247. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  248. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  249. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  250. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  251. procedure Run;
  252. end;
  253. IScheduledTask = interface(ITask)
  254. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  255. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  256. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  257. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  258. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  259. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  260. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  261. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  262. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  263. function Retry(aMaxRetries : Integer) : IScheduledTask;
  264. function RetryForever : IScheduledTask;
  265. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  266. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  267. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  268. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  269. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  270. function CheckSchedule : Boolean;
  271. procedure DoExpire;
  272. function GetTaskName : string;
  273. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  274. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  275. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  276. function StartOnDayChange : IScheduledTask;
  277. function StartNow : IScheduledTask;
  278. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  279. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  280. procedure RunOnce;
  281. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  282. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  284. procedure RepeatEveryDay;
  285. procedure RepeatEveryWeek;
  286. function IsFinished : Boolean;
  287. procedure Cancel;
  288. property Name : string read GetTaskName;
  289. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  290. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  291. end;
  292. TTask = class(TInterfacedObject,ITask)
  293. private
  294. fIdTask : Int64;
  295. fNumWorker : Integer;
  296. fNumRetries : Integer;
  297. fParamList : TParamList;
  298. fInitializeProc : TTaskProc;
  299. fExecuteProc : TTaskProc;
  300. fExceptProc : TTaskExceptionProc;
  301. fTerminateProc : TTaskProc;
  302. fExpiredProc : TTaskProc;
  303. fTaskStatus : TWorkTaskStatus;
  304. fOwnedParams : Boolean;
  305. fEnabled : Boolean;
  306. fExecuteWithSync : Boolean;
  307. fExceptionWithSync : Boolean;
  308. fRetryProc : TTaskRetryProc;
  309. fTerminateWithSync : Boolean;
  310. fFaultControl : TFaultControl;
  311. fCustomFaultPolicy : Boolean;
  312. fResult : TFlexValue;
  313. function GetParam(aIndex : Integer) : TFlexValue; overload;
  314. function GetParam(const aName : string) : TFlexValue; overload;
  315. function GetParam2(aIndex : Integer) : PFlexValue;
  316. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  317. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  318. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  319. procedure DoInitialize;
  320. procedure DoExecute;
  321. procedure DoException(aException : Exception);
  322. procedure DoTerminate;
  323. function GetNumWorker : Integer;
  324. procedure SetNumWorker(Value : Integer);
  325. function GetIdTask : Int64;
  326. procedure SetIdTask(Value : Int64);
  327. function GetResult : TFlexValue;
  328. procedure SetResult(aValue : TFlexValue);
  329. protected
  330. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  331. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  332. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  333. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  334. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  335. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  336. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  337. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  338. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  339. public
  340. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  341. destructor Destroy; override;
  342. property IdTask : Int64 read GetIdTask;
  343. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  344. function IsEnabled : Boolean;
  345. function TaskStatus : TWorkTaskStatus;
  346. function Done : Boolean;
  347. function Failed : Boolean;
  348. function NumRetries : Integer;
  349. function MaxRetries : Integer;
  350. function LastException : Exception;
  351. function CircuitBreaked : Boolean;
  352. end;
  353. TWorkTask = class(TTask,IWorkTask)
  354. public
  355. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  356. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  357. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  358. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  359. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  360. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  361. function Retry(aMaxRetries : Integer) : IWorkTask;
  362. function RetryForever : IWorkTask;
  363. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  364. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  365. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  366. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  367. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  368. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  369. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  370. procedure Run; virtual;
  371. end;
  372. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  373. TScheduledTask = class(TTask,IScheduledTask)
  374. private
  375. fName : string;
  376. fExecutionTimes : Integer;
  377. fScheduleMode : TScheduleMode;
  378. fTimeInterval : Integer;
  379. fTimeMeasure : TTimeMeasure;
  380. fStartDate : TDateTime;
  381. fLastExecution : TDateTime;
  382. fNextExecution : TDateTime;
  383. fExpirationDate : TDateTime;
  384. fExpirationTimes : Integer;
  385. fFinished : Boolean;
  386. fExpireWithSync: Boolean;
  387. procedure ClearSchedule;
  388. function CheckSchedule : Boolean;
  389. procedure DoExpire;
  390. function GetTaskName : string;
  391. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  392. protected
  393. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  394. public
  395. property Name : string read fName write fName;
  396. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  397. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  398. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  399. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  400. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  401. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  402. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  403. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  404. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  405. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  406. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  407. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  408. function StartOnDayChange : IScheduledTask;
  409. function StartNow : IScheduledTask;
  410. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  411. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  412. procedure RunOnce;
  413. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  414. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  415. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  416. procedure RepeatEveryDay;
  417. procedure RepeatEveryWeek;
  418. function Retry(aMaxRetries : Integer) : IScheduledTask;
  419. function RetryForever : IScheduledTask;
  420. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  421. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  422. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  423. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  424. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  425. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  426. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  427. function IsFinished : Boolean;
  428. procedure Cancel;
  429. end;
  430. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  431. TWorker = class(TThread)
  432. protected
  433. fStatus : TWorkerStatus;
  434. fCurrentTask : ITask;
  435. fDefaultFaultPolicy : TFaultPolicy;
  436. procedure ExecuteTask;
  437. procedure TerminateTask;
  438. public
  439. constructor Create;
  440. destructor Destroy; override;
  441. property Status : TWorkerStatus read fStatus;
  442. procedure SetFaultPolicy(aTask : TTask);
  443. procedure Execute; override;
  444. end;
  445. TSimpleWorker = class(TWorker)
  446. public
  447. constructor Create(aTask : ITask);
  448. procedure Execute; override;
  449. end;
  450. TQueueWorker = class(TWorker)
  451. private
  452. fCurrentIdTask : Integer;
  453. fNumWorker : Integer;
  454. fTaskQueue : TTaskQueue;
  455. public
  456. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  457. property NumWorker : Integer read fNumWorker;
  458. procedure Execute; override;
  459. end;
  460. TScheduledWorker = class(TWorker)
  461. private
  462. procedure ExpireTask;
  463. public
  464. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  465. procedure Execute; override;
  466. end;
  467. TWorkerPool = TObjectList<TWorker>;
  468. TRunTask = class
  469. public
  470. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  471. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  472. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  473. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  474. end;
  475. TBackgroundTasks = class
  476. private
  477. fMaxQueue : Integer;
  478. fWorkerPool : TWorkerPool;
  479. fConcurrentWorkers : Integer;
  480. fInsertTimeout : Cardinal;
  481. fExtractTimeout : Cardinal;
  482. fTaskQueue : TTaskQueue;
  483. fNumPushedTasks : Int64;
  484. function GetTaskQueue : Cardinal;
  485. public
  486. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  487. destructor Destroy; override;
  488. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  489. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  490. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  491. property TaskQueued : Cardinal read GetTaskQueue;
  492. property NumPushedTasks : Int64 read fNumPushedTasks;
  493. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  494. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  495. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  496. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  497. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  498. procedure Start;
  499. procedure CancelAll;
  500. end;
  501. TScheduledTaskList = TList<IScheduledTask>;
  502. TScheduler = class(TThread)
  503. private
  504. fListLock : TCriticalSection;
  505. fCondVar : TSimpleEvent;
  506. fTaskList : TScheduledTaskList;
  507. fRemoveTaskAfterExpiration : Boolean;
  508. public
  509. constructor Create(aTaskList : TScheduledTaskList);
  510. destructor Destroy; override;
  511. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  512. procedure Execute; override;
  513. function Add(aTask : TScheduledTask) : Integer;
  514. function Get(aIdTask : Int64) : IScheduledTask; overload;
  515. function Get(const aTaskName : string) : IScheduledTask; overload;
  516. end;
  517. TScheduledTasks = class
  518. private
  519. fTaskList : TScheduledTaskList;
  520. fScheduler : TScheduler;
  521. fNumPushedTasks : Int64;
  522. fRemoveTaskAfterExpiration : Boolean;
  523. fIsStarted : Boolean;
  524. fFaultPolicy : TFaultPolicy;
  525. public
  526. constructor Create;
  527. destructor Destroy; override;
  528. property NumPushedTasks : Int64 read fNumPushedTasks;
  529. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  530. property IsStarted : Boolean read fIsStarted;
  531. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  532. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  533. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  534. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  535. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  536. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  537. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  538. procedure Start;
  539. procedure Stop;
  540. end;
  541. implementation
  542. { TThreadedQueueCS<T> }
  543. procedure TThreadedQueueCS<T>.Clear;
  544. var
  545. obj : T;
  546. begin
  547. FQueueLock.Enter;
  548. try
  549. for obj in FQueue do
  550. begin
  551. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  552. end;
  553. SetLength(FQueue,0);
  554. finally
  555. FQueueLock.Leave;
  556. end;
  557. {$IFDEF FPC}
  558. FQueueCondVar.SetEvent;
  559. {$ELSE}
  560. FQueueCondVar.ReleaseAll;
  561. {$ENDIF}
  562. end;
  563. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  564. begin
  565. inherited Create;
  566. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  567. SetLength(FQueue, AQueueDepth);
  568. FQueueLock := TCriticalSection.Create;
  569. {$IFDEF FPC}
  570. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  571. {$ELSE}
  572. FQueueCondVar := TConditionVariableCS.Create;
  573. {$ENDIF}
  574. FPushTimeout := PushTimeout;
  575. FPopTimeout := PopTimeout;
  576. end;
  577. destructor TThreadedQueueCS<T>.Destroy;
  578. begin
  579. DoShutDown;
  580. FQueueLock.Free;
  581. FQueueCondVar.Free;
  582. inherited;
  583. end;
  584. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  585. begin
  586. FQueueLock.Enter;
  587. try
  588. SetLength(FQueue, Length(FQueue) + ADelta);
  589. finally
  590. FQueueLock.Leave;
  591. end;
  592. {$IFDEF FPC}
  593. FQueueCondVar.SetEvent;
  594. {$ELSE}
  595. FQueueCondVar.ReleaseAll;
  596. {$ENDIF}
  597. end;
  598. function TThreadedQueueCS<T>.PopItem: T;
  599. var
  600. LQueueSize: Integer;
  601. begin
  602. PopItem(LQueueSize, Result);
  603. end;
  604. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  605. begin
  606. AItem := Default(T);
  607. FQueueLock.Enter;
  608. try
  609. Result := wrSignaled;
  610. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  611. begin
  612. {$IFDEF FPC}
  613. Result := FQueueCondVar.WaitFor(FPopTimeout);
  614. {$ELSE}
  615. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  616. {$ENDIF}
  617. end;
  618. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  619. AItem := FQueue[FQueueOffset];
  620. FQueue[FQueueOffset] := Default(T);
  621. if FQueueSize = Length(FQueue) then
  622. begin
  623. {$IFDEF FPC}
  624. FQueueCondVar.SetEvent;
  625. {$ELSE}
  626. FQueueCondVar.ReleaseAll;
  627. {$ENDIF}
  628. end;
  629. Dec(FQueueSize);
  630. Inc(FQueueOffset);
  631. Inc(FTotalItemsPopped);
  632. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  633. finally
  634. AQueueSize := FQueueSize;
  635. FQueueLock.Leave;
  636. end;
  637. end;
  638. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  639. var
  640. LQueueSize: Integer;
  641. begin
  642. Result := PopItem(LQueueSize, AItem);
  643. end;
  644. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  645. begin
  646. PopItem(AQueueSize, Result);
  647. end;
  648. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  649. var
  650. LQueueSize: Integer;
  651. begin
  652. Result := PushItem(AItem, LQueueSize);
  653. end;
  654. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  655. begin
  656. FQueueLock.Enter;
  657. try
  658. if FQueueSize >= High(FQueue) then
  659. begin
  660. if FQueueSize < 1024 then Grow(FQueueSize)
  661. else Grow(FQueueSize Div 2);
  662. end;
  663. Result := wrSignaled;
  664. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  665. begin
  666. {$IFDEF FPC}
  667. Result := FQueueCondVar.WaitFor(FPushTimeout);
  668. {$ELSE}
  669. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  670. {$ENDIF}
  671. end;
  672. if FShutDown or (Result <> wrSignaled) then Exit;
  673. if FQueueSize = 0 then
  674. begin
  675. {$IFDEF FPC}
  676. FQueueCondVar.SetEvent;
  677. {$ELSE}
  678. FQueueCondVar.ReleaseAll;
  679. {$ENDIF}
  680. end;
  681. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  682. Inc(FQueueSize);
  683. Inc(FTotalItemsPushed);
  684. finally
  685. AQueueSize := FQueueSize;
  686. FQueueLock.Leave;
  687. end;
  688. end;
  689. procedure TThreadedQueueCS<T>.DoShutDown;
  690. begin
  691. FShutDown := True;
  692. {$IFDEF FPC}
  693. FQueueCondVar.SetEvent;
  694. {$ELSE}
  695. FQueueCondVar.ReleaseAll;
  696. {$ENDIF}
  697. end;
  698. { TThreadedQueueList<T> }
  699. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  700. begin
  701. inherited Create;
  702. fQueue := TQueue<T>.Create;
  703. fQueue.Capacity := AQueueDepth;
  704. fQueueSize := 0;
  705. fQueueLock := TCriticalSection.Create;
  706. {$IFDEF FPC}
  707. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  708. {$ELSE}
  709. fQueueCondVar := TConditionVariableCS.Create;
  710. {$ENDIF}
  711. fPushTimeout := PushTimeout;
  712. fPopTimeout := PopTimeout;
  713. end;
  714. destructor TThreadedQueueList<T>.Destroy;
  715. begin
  716. DoShutDown;
  717. fQueueLock.Free;
  718. fQueueCondVar.Free;
  719. fQueue.Free;
  720. inherited;
  721. end;
  722. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  723. begin
  724. fQueueLock.Enter;
  725. try
  726. fQueue.Capacity := fQueue.Capacity + ADelta;
  727. finally
  728. fQueueLock.Leave;
  729. end;
  730. {$IFDEF FPC}
  731. FQueueCondVar.SetEvent;
  732. {$ELSE}
  733. FQueueCondVar.ReleaseAll;
  734. {$ENDIF}
  735. end;
  736. function TThreadedQueueList<T>.PopItem: T;
  737. var
  738. LQueueSize: Integer;
  739. begin
  740. PopItem(LQueueSize, Result);
  741. end;
  742. {$IFDEF FPC}
  743. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  744. //var
  745. //crono : TChronometer;
  746. begin
  747. AItem := Default(T);
  748. //crono := TChronometer.Create(False);
  749. try
  750. Result := wrSignaled;
  751. //writeln('popitem');
  752. //crono.Start;
  753. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  754. begin
  755. //crono.Start;
  756. Result := FQueueCondVar.WaitFor(FPopTimeout);
  757. //crono.Stop;
  758. //writeln('in: ' + crono.ElapsedTime);
  759. //if result = twaitresult.wrError then result := twaitresult.wrError;
  760. end;
  761. //crono.Stop;
  762. //writeln('out: ' + crono.ElapsedTime);
  763. fQueueLock.Enter;
  764. try
  765. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  766. AItem := fQueue.Extract;
  767. Dec(FQueueSize);
  768. Inc(fTotalItemsPopped);
  769. finally
  770. fQueueLock.Leave;
  771. end;
  772. finally
  773. AQueueSize := fQueueSize;
  774. end;
  775. end;
  776. {$ELSE}
  777. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  778. begin
  779. AItem := Default(T);
  780. fQueueLock.Enter;
  781. try
  782. Result := wrSignaled;
  783. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  784. begin
  785. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  786. end;
  787. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  788. AItem := fQueue.Extract;
  789. if fQueueSize = fQueue.Count then
  790. begin
  791. FQueueCondVar.ReleaseAll;
  792. end;
  793. Dec(FQueueSize);
  794. Inc(fTotalItemsPopped);
  795. finally
  796. AQueueSize := fQueueSize;
  797. fQueueLock.Leave;
  798. end;
  799. end;
  800. {$ENDIF}
  801. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  802. var
  803. LQueueSize: Integer;
  804. begin
  805. Result := PopItem(LQueueSize, AItem);
  806. end;
  807. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  808. begin
  809. PopItem(AQueueSize, Result);
  810. end;
  811. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  812. var
  813. LQueueSize: Integer;
  814. begin
  815. Result := PushItem(AItem, LQueueSize);
  816. end;
  817. {$IFDEF FPC}
  818. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  819. begin
  820. FQueueLock.Enter;
  821. try
  822. if FQueueSize >= fQueue.Count then Grow(10);
  823. Result := wrSignaled;
  824. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  825. //begin
  826. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  827. //end;
  828. if fShutDown or (Result <> wrSignaled) then Exit;
  829. //if fQueueSize = 0 then
  830. //begin
  831. // FQueueCondVar.SetEvent;
  832. //end;
  833. fQueue.Enqueue(AItem);
  834. Inc(FQueueSize);
  835. Inc(fTotalItemsPushed);
  836. finally
  837. AQueueSize := fQueueSize;
  838. FQueueLock.Leave;
  839. //FQueueCondVar.SetEvent;
  840. end;
  841. end;
  842. {$ELSE}
  843. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  844. begin
  845. FQueueLock.Enter;
  846. try
  847. Result := wrSignaled;
  848. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  849. //begin
  850. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  851. //end;
  852. if fShutDown or (Result <> wrSignaled) then Exit;
  853. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  854. fQueue.Enqueue(AItem);
  855. Inc(FQueueSize);
  856. Inc(fTotalItemsPushed);
  857. finally
  858. AQueueSize := fQueueSize;
  859. FQueueLock.Leave;
  860. end;
  861. end;
  862. {$ENDIF}
  863. procedure TThreadedQueueList<T>.DoShutDown;
  864. begin
  865. fShutDown := True;
  866. {$IFDEF FPC}
  867. FQueueCondVar.SetEvent;
  868. {$ELSE}
  869. FQueueCondVar.ReleaseAll;
  870. {$ENDIF}
  871. end;
  872. {$IFNDEF FPC}
  873. { TThreadObjectList<T> }
  874. procedure TThreadObjectList<T>.Add(const Item: T);
  875. begin
  876. LockList;
  877. try
  878. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  879. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  880. finally
  881. UnlockList;
  882. end;
  883. end;
  884. procedure TThreadObjectList<T>.Clear;
  885. begin
  886. LockList;
  887. try
  888. fList.Clear;
  889. finally
  890. UnlockList;
  891. end;
  892. end;
  893. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  894. begin
  895. inherited Create;
  896. fLock := TObject.Create;
  897. fList := TObjectList<T>.Create;
  898. fDuplicates := dupIgnore;
  899. end;
  900. destructor TThreadObjectList<T>.Destroy;
  901. begin
  902. LockList;
  903. try
  904. fList.Free;
  905. inherited Destroy;
  906. finally
  907. UnlockList;
  908. fLock.Free;
  909. end;
  910. end;
  911. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  912. begin
  913. LockList;
  914. try
  915. Result := fList[aIndex];
  916. finally
  917. UnlockList;
  918. end;
  919. end;
  920. function TThreadObjectList<T>.LockList: TObjectList<T>;
  921. begin
  922. System.TMonitor.Enter(fLock);
  923. Result := fList;
  924. end;
  925. procedure TThreadObjectList<T>.Remove(const Item: T);
  926. begin
  927. RemoveItem(Item, TDirection.FromBeginning);
  928. end;
  929. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  930. begin
  931. LockList;
  932. try
  933. fList.RemoveItem(Item, Direction);
  934. finally
  935. UnlockList;
  936. end;
  937. end;
  938. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  939. begin
  940. LockList;
  941. try
  942. fList[aIndex] := aValue;
  943. finally
  944. UnlockList;
  945. end;
  946. end;
  947. procedure TThreadObjectList<T>.UnlockList;
  948. begin
  949. System.TMonitor.Exit(fLock);
  950. end;
  951. {$ENDIF}
  952. { TAnonymousThread }
  953. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  954. begin
  955. fThread := TAdvThread.Create(aProc,aSynchronize);
  956. end;
  957. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  958. begin
  959. Result := TAnonymousThread.Create(aProc,False);
  960. end;
  961. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  962. begin
  963. Result := TAnonymousThread.Create(aProc,True);
  964. end;
  965. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  966. begin
  967. Result := Self;
  968. fThread.OnException(aProc);
  969. end;
  970. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  971. begin
  972. Result := Self;
  973. fThread.OnTerminate(aProc,False);
  974. end;
  975. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  976. begin
  977. Result := Self;
  978. fThread.OnTerminate(aProc,True);
  979. end;
  980. procedure TAnonymousThread.Start;
  981. begin
  982. fThread.Start;
  983. end;
  984. { TTask }
  985. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  986. var
  987. i : Integer;
  988. begin
  989. fTaskStatus := TWorkTaskStatus.wtsPending;
  990. fCustomFaultPolicy := False;
  991. fNumRetries := 0;
  992. fExecuteWithSync := False;
  993. fTerminateWithSync := False;
  994. fExceptionWithSync := False;
  995. fFaultControl := TFaultControl.Create;
  996. fFaultControl.OnRetry := DoRetry;
  997. fOwnedParams := aOwnedParams;
  998. fParamList := TParamList.Create(True);
  999. for i := Low(aParamArray) to High(aParamArray) do
  1000. begin
  1001. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1002. {$IFDEF FPC}
  1003. fParamList[i].Value._AddRef;
  1004. {$ENDIF}
  1005. end;
  1006. fExecuteProc := aTaskProc;
  1007. fEnabled := False;
  1008. end;
  1009. destructor TTask.Destroy;
  1010. begin
  1011. fFaultControl.Free;
  1012. //free passed params
  1013. fParamList.Free;
  1014. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1015. inherited;
  1016. end;
  1017. procedure TTask.DoException(aException : Exception);
  1018. begin
  1019. fTaskStatus := TWorkTaskStatus.wtsException;
  1020. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1021. else raise aException;
  1022. end;
  1023. procedure TTask.DoExecute;
  1024. begin
  1025. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1026. DoInitialize;
  1027. repeat
  1028. try
  1029. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1030. fTaskStatus := TWorkTaskStatus.wtsDone;
  1031. fFaultControl.SuccessExecution;
  1032. except
  1033. on E : Exception do
  1034. begin
  1035. fTaskStatus := TWorkTaskStatus.wtsException;
  1036. {$IFNDEF FPC}
  1037. {$IF DELPHIRX10_UP}
  1038. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1039. {$ELSE}
  1040. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1041. {$ENDIF}
  1042. {$ELSE}
  1043. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1044. {$ENDIF}
  1045. end;
  1046. end;
  1047. until not fFaultControl.NeedToRetry;
  1048. end;
  1049. procedure TTask.DoInitialize;
  1050. begin
  1051. try
  1052. fFaultControl.Reset;
  1053. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1054. except
  1055. on E : Exception do
  1056. begin
  1057. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1058. end;
  1059. end;
  1060. end;
  1061. function TTask.Done: Boolean;
  1062. begin
  1063. Result := not fFaultControl.TaskFailed;
  1064. end;
  1065. function TTask.Failed: Boolean;
  1066. begin
  1067. Result := fFaultControl.TaskFailed;
  1068. end;
  1069. function TTask.CircuitBreaked: Boolean;
  1070. begin
  1071. Result := fFaultControl.CircuitBreaked;
  1072. end;
  1073. function TTask.LastException: Exception;
  1074. begin
  1075. Result := fFaultControl.LastException;
  1076. end;
  1077. function TTask.MaxRetries: Integer;
  1078. begin
  1079. Result := fFaultControl.MaxRetries;
  1080. end;
  1081. function TTask.NumRetries: Integer;
  1082. begin
  1083. Result := fFaultControl.NumRetries;
  1084. end;
  1085. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1086. begin
  1087. vStopRetries := False;
  1088. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1089. end;
  1090. procedure TTask.DoTerminate;
  1091. begin
  1092. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1093. end;
  1094. function TTask.GetIdTask: Int64;
  1095. begin
  1096. Result := fIdTask;
  1097. end;
  1098. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1099. begin
  1100. {$IFDEF FPC}
  1101. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1102. {$ENDIF}
  1103. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1104. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1105. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1106. end;
  1107. procedure TTask.SetIdTask(Value : Int64);
  1108. begin
  1109. fIdTask := Value;
  1110. end;
  1111. function TTask.GetNumWorker: Integer;
  1112. begin
  1113. Result := fNumWorker;
  1114. end;
  1115. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1116. begin
  1117. Result := fParamList[aIndex].Value;
  1118. end;
  1119. function TTask.GetParam(const aName: string): TFlexValue;
  1120. var
  1121. paramvalue : TParamValue;
  1122. begin
  1123. for paramvalue in fParamList do
  1124. begin
  1125. if CompareText(paramvalue.Name,aName) = 0 then
  1126. begin
  1127. Exit(paramvalue.Value)
  1128. end;
  1129. end;
  1130. //if not exists
  1131. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1132. end;
  1133. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1134. begin
  1135. Result := @fParamList[aIndex].Value;
  1136. end;
  1137. function TTask.GetResult: TFlexValue;
  1138. begin
  1139. Result := fResult;
  1140. end;
  1141. function TTask.IsEnabled: Boolean;
  1142. begin
  1143. Result := fEnabled;
  1144. end;
  1145. procedure TTask.SetNumWorker(Value: Integer);
  1146. begin
  1147. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1148. fNumWorker := Value;
  1149. end;
  1150. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1151. begin
  1152. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1153. fParamList[aIndex].Value := Value;
  1154. end;
  1155. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1156. var
  1157. paramvalue : TParamValue;
  1158. begin
  1159. //check if already exists parameter
  1160. for paramvalue in fParamList do
  1161. begin
  1162. if CompareText(paramvalue.Name,aName) = 0 then
  1163. begin
  1164. paramvalue.Value := Value;
  1165. Exit;
  1166. end;
  1167. end;
  1168. //if not exists, create one
  1169. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1170. end;
  1171. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1172. begin
  1173. SetParam(aName,Value,False);
  1174. end;
  1175. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1176. begin
  1177. fFaultControl.MaxRetries := aMaxRetries;
  1178. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1179. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1180. fCustomFaultPolicy := True;
  1181. end;
  1182. procedure TTask.SetResult(aValue: TFlexValue);
  1183. begin
  1184. fResult := aValue;
  1185. end;
  1186. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1187. begin
  1188. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1189. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1190. fFaultControl.WaitTimeMultiplierFactor := 1;
  1191. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1192. fCustomFaultPolicy := True;
  1193. end;
  1194. function TTask.TaskStatus: TWorkTaskStatus;
  1195. begin
  1196. Result := fTaskStatus;
  1197. end;
  1198. { TWorkTask }
  1199. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1200. begin
  1201. fExceptProc := aTaskProc;
  1202. Result := Self;
  1203. end;
  1204. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1205. begin
  1206. fExceptionWithSync := True;
  1207. Result := OnException(aTaskProc);
  1208. end;
  1209. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1210. begin
  1211. fInitializeProc := aTaskProc;
  1212. Result := Self;
  1213. end;
  1214. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1215. begin
  1216. fRetryProc := aTaskProc;
  1217. Result := Self;
  1218. end;
  1219. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1220. begin
  1221. fTerminateProc := aTaskProc;
  1222. Result := Self;
  1223. end;
  1224. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1225. begin
  1226. fTerminateWithSync := True;
  1227. Result := OnTerminated(aTaskProc);
  1228. end;
  1229. procedure TWorkTask.Run;
  1230. begin
  1231. fEnabled := True;
  1232. end;
  1233. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1234. begin
  1235. Result := Self;
  1236. SetParam(aName,aValue);
  1237. end;
  1238. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1239. begin
  1240. Result := Self;
  1241. SetParam(aName,aValue,aOwned);
  1242. end;
  1243. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1244. begin
  1245. Result := Self;
  1246. SetRetryPolicy(aMaxRetries,0,1);
  1247. end;
  1248. function TWorkTask.RetryForever: IWorkTask;
  1249. begin
  1250. Result := Self;
  1251. SetRetryPolicy(-1,0,1);
  1252. end;
  1253. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1254. begin
  1255. Result := Self;
  1256. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1257. end;
  1258. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1259. begin
  1260. Result := Self;
  1261. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1262. end;
  1263. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1264. begin
  1265. Result := Self;
  1266. SetRetryPolicy(aWaitTimeArray);
  1267. end;
  1268. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1269. begin
  1270. Result := Self;
  1271. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1272. end;
  1273. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1274. begin
  1275. Result := Self;
  1276. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1277. end;
  1278. { TBackgroundTasks }
  1279. procedure TBackgroundTasks.CancelAll;
  1280. begin
  1281. fTaskQueue.Clear;
  1282. end;
  1283. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1284. begin
  1285. fMaxQueue := 0;
  1286. fConcurrentWorkers := aConcurrentWorkers;
  1287. fInsertTimeout := INFINITE;
  1288. fExtractTimeout := 5000;
  1289. fNumPushedTasks := 0;
  1290. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1291. end;
  1292. destructor TBackgroundTasks.Destroy;
  1293. begin
  1294. CancelAll;
  1295. fTaskQueue.DoShutDown;
  1296. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1297. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1298. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1299. inherited;
  1300. end;
  1301. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1302. begin
  1303. Result := fTaskQueue.QueueSize;
  1304. end;
  1305. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1306. begin
  1307. Result := AddTask([],False,aTaskProc);
  1308. end;
  1309. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1310. var
  1311. worktask : IWorkTask;
  1312. begin
  1313. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1314. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1315. Inc(fNumPushedTasks);
  1316. worktask.SetIdTask(fNumPushedTasks);
  1317. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1318. begin
  1319. Result := worktask;
  1320. end;
  1321. end;
  1322. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1323. begin
  1324. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1325. TTask(Result).ExecuteWithSync := True;
  1326. end;
  1327. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1328. begin
  1329. Result := AddTask_Sync([],False,aTaskProc);
  1330. end;
  1331. procedure TBackgroundTasks.Start;
  1332. var
  1333. i : Integer;
  1334. worker : TWorker;
  1335. begin
  1336. //create workers
  1337. if fWorkerPool <> nil then fWorkerPool.Free;
  1338. fWorkerPool := TWorkerPool.Create(True);
  1339. for i := 1 to fConcurrentWorkers do
  1340. begin
  1341. worker := TQueueWorker.Create(i,fTaskQueue);
  1342. fWorkerPool.Add(worker);
  1343. worker.Start;
  1344. end;
  1345. end;
  1346. { TWorker }
  1347. constructor TWorker.Create;
  1348. begin
  1349. inherited Create(True);
  1350. fDefaultFaultPolicy := TFaultPolicy.Create;
  1351. fStatus := TWorkerStatus.wsSuspended;
  1352. FreeOnTerminate := False;
  1353. end;
  1354. destructor TWorker.Destroy;
  1355. begin
  1356. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1357. inherited;
  1358. end;
  1359. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1360. begin
  1361. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1362. end;
  1363. procedure TWorker.Execute;
  1364. begin
  1365. end;
  1366. procedure TWorker.ExecuteTask;
  1367. begin
  1368. fCurrentTask.DoExecute;
  1369. end;
  1370. procedure TWorker.TerminateTask;
  1371. begin
  1372. fCurrentTask.DoTerminate;
  1373. end;
  1374. { TSimpleWorker }
  1375. constructor TSimpleWorker.Create(aTask : ITask);
  1376. begin
  1377. inherited Create;
  1378. fCurrentTask := aTask;
  1379. FreeOnTerminate := True;
  1380. end;
  1381. procedure TSimpleWorker.Execute;
  1382. begin
  1383. fStatus := TWorkerStatus.wsIdle;
  1384. while not Terminated do
  1385. begin
  1386. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1387. try
  1388. fStatus := TWorkerStatus.wsWorking;
  1389. try
  1390. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1391. else fCurrentTask.DoExecute;
  1392. except
  1393. on E : Exception do
  1394. begin
  1395. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1396. else raise ETaskExecutionError.Create(e.Message);
  1397. end;
  1398. end;
  1399. finally
  1400. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1401. else fCurrentTask.DoTerminate;
  1402. fStatus := TWorkerStatus.wsIdle;
  1403. Terminate;
  1404. end;
  1405. end;
  1406. fStatus := TWorkerStatus.wsSuspended
  1407. end;
  1408. { TQueueWorker }
  1409. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1410. begin
  1411. inherited Create;
  1412. fNumWorker := aNumWorker;
  1413. fTaskQueue := aTaskQueue;
  1414. end;
  1415. procedure TQueueWorker.Execute;
  1416. begin
  1417. fStatus := TWorkerStatus.wsIdle;
  1418. while not Terminated do
  1419. begin
  1420. fCurrentTask := fTaskQueue.PopItem;
  1421. if fCurrentTask <> nil then
  1422. try
  1423. fStatus := TWorkerStatus.wsWorking;
  1424. try
  1425. fCurrentIdTask := fCurrentTask.GetIdTask;
  1426. SetFaultPolicy(TTask(fCurrentTask));
  1427. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1428. else fCurrentTask.DoExecute;
  1429. except
  1430. on E : Exception do
  1431. begin
  1432. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1433. else raise ETaskExecutionError.Create(e.Message);
  1434. end;
  1435. end;
  1436. finally
  1437. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1438. else fCurrentTask.DoTerminate;
  1439. fStatus := TWorkerStatus.wsIdle;
  1440. end;
  1441. end;
  1442. fStatus := TWorkerStatus.wsSuspended
  1443. end;
  1444. { TScheduledWorker }
  1445. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1446. begin
  1447. inherited Create;
  1448. {$IFNDEF DELPHILINUX}
  1449. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1450. {$ENDIF}
  1451. FreeOnTerminate := True;
  1452. fCurrentTask := aScheduledTask;
  1453. end;
  1454. procedure TScheduledWorker.Execute;
  1455. begin
  1456. fStatus := TWorkerStatus.wsIdle;
  1457. if Assigned(fCurrentTask) then
  1458. begin
  1459. try
  1460. fStatus := TWorkerStatus.wsWorking;
  1461. try
  1462. SetFaultPolicy(TTask(fCurrentTask));
  1463. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1464. else fCurrentTask.DoExecute;
  1465. fStatus := TWorkerStatus.wsIdle;
  1466. except
  1467. on E : Exception do
  1468. begin
  1469. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1470. else raise ETaskExecutionError.Create(e.Message);
  1471. end;
  1472. end;
  1473. finally
  1474. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1475. else fCurrentTask.DoTerminate;
  1476. //check if expired
  1477. if (fCurrentTask as IScheduledTask).IsFinished then
  1478. begin
  1479. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1480. else (fCurrentTask as IScheduledTask).DoExpire;
  1481. end;
  1482. end;
  1483. end;
  1484. fCurrentTask := nil;
  1485. fStatus := TWorkerStatus.wsSuspended;
  1486. end;
  1487. procedure TScheduledWorker.ExpireTask;
  1488. begin
  1489. (fCurrentTask as IScheduledTask).DoExpire;
  1490. end;
  1491. { TScheduledTasks }
  1492. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1493. begin
  1494. Result := AddTask(aTaskName,[],False,aTaskProc);
  1495. end;
  1496. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1497. var
  1498. scheduletask : TScheduledTask;
  1499. begin
  1500. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1501. scheduletask.Name := aTaskName;
  1502. Inc(fNumPushedTasks);
  1503. scheduletask.SetIdTask(fNumPushedTasks);
  1504. fTaskList.Add(scheduletask);
  1505. Result := scheduletask;
  1506. end;
  1507. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1508. begin
  1509. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1510. TTask(Result).ExecuteWithSync := True;
  1511. end;
  1512. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1513. begin
  1514. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1515. end;
  1516. constructor TScheduledTasks.Create;
  1517. begin
  1518. fNumPushedTasks := 0;
  1519. fIsStarted := False;
  1520. fFaultPolicy := TFaultPolicy.Create;
  1521. fTaskList := TScheduledTaskList.Create;
  1522. end;
  1523. destructor TScheduledTasks.Destroy;
  1524. begin
  1525. if Assigned(fScheduler) then
  1526. begin
  1527. fScheduler.Terminate;
  1528. fScheduler.WaitFor;
  1529. fScheduler.Free;
  1530. end;
  1531. if Assigned(fTaskList) then fTaskList.Free;
  1532. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1533. inherited;
  1534. end;
  1535. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1536. begin
  1537. Result := fScheduler.Get(aIdTask);
  1538. end;
  1539. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1540. begin
  1541. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1542. Result := fScheduler.Get(aTaskName);
  1543. end;
  1544. procedure TScheduledTasks.Start;
  1545. begin
  1546. if fIsStarted then Exit;
  1547. if not Assigned(fScheduler) then
  1548. begin
  1549. fScheduler := TScheduler.Create(fTaskList);
  1550. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1551. end;
  1552. fScheduler.Start;
  1553. fIsStarted := True;
  1554. end;
  1555. procedure TScheduledTasks.Stop;
  1556. begin
  1557. if Assigned(fScheduler) then fScheduler.Terminate;
  1558. fIsStarted := False;
  1559. end;
  1560. { TScheduledTask }
  1561. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1562. begin
  1563. Result := Self;
  1564. SetParam(aName,aValue);
  1565. end;
  1566. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1567. begin
  1568. Result := Self;
  1569. SetParam(aName,aValue);
  1570. end;
  1571. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1572. begin
  1573. Result := Self;
  1574. ClearSchedule;
  1575. fScheduleMode := TScheduleMode.smRunOnce;
  1576. fStartDate := aStartDate;
  1577. fNextExecution := aStartDate;
  1578. end;
  1579. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1580. begin
  1581. Result := Self;
  1582. ClearSchedule;
  1583. fScheduleMode := TScheduleMode.smRunOnce;
  1584. fStartDate := IncMinute(Now(),aMinutes);
  1585. fNextExecution := fStartDate;
  1586. end;
  1587. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1588. begin
  1589. Result := Self;
  1590. ClearSchedule;
  1591. fScheduleMode := TScheduleMode.smRunOnce;
  1592. fStartDate := IncSecond(Now(),aSeconds);
  1593. fNextExecution := fStartDate;
  1594. end;
  1595. function TScheduledTask.StartNow: IScheduledTask;
  1596. begin
  1597. Result := Self;
  1598. ClearSchedule;
  1599. fScheduleMode := TScheduleMode.smRunOnce;
  1600. fStartDate := Now();
  1601. fNextExecution := fStartDate;
  1602. end;
  1603. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1604. begin
  1605. Result := Self;
  1606. ClearSchedule;
  1607. fScheduleMode := TScheduleMode.smRunOnce;
  1608. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1609. fNextExecution := fStartDate;
  1610. end;
  1611. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1612. begin
  1613. Result := Self;
  1614. ClearSchedule;
  1615. fScheduleMode := TScheduleMode.smRunOnce;
  1616. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1617. fNextExecution := fStartDate;
  1618. end;
  1619. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1620. begin
  1621. Result := Self;
  1622. ClearSchedule;
  1623. fScheduleMode := TScheduleMode.smRunOnce;
  1624. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1625. fNextExecution := fStartDate;
  1626. end;
  1627. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1628. begin
  1629. Result := Self;
  1630. SetRetryPolicy(aMaxRetries,0,1);
  1631. end;
  1632. function TScheduledTask.RetryForever: IScheduledTask;
  1633. begin
  1634. Result := Self;
  1635. SetRetryPolicy(-1,0,1);
  1636. end;
  1637. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1638. begin
  1639. Result := Self;
  1640. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1641. end;
  1642. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1643. begin
  1644. Result := Self;
  1645. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1646. end;
  1647. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1648. begin
  1649. Result := Self;
  1650. SetRetryPolicy(aWaitTimeArray);
  1651. end;
  1652. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1653. begin
  1654. Result := Self;
  1655. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1656. end;
  1657. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1658. begin
  1659. Result := Self;
  1660. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1661. end;
  1662. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1663. begin
  1664. if fStartDate = 0.0 then ClearSchedule;
  1665. fScheduleMode := TScheduleMode.smRepeatMode;
  1666. fTimeMeasure := aTimeMeasure;
  1667. fTimeInterval := aInterval;
  1668. if fStartDate < Now() then fStartDate := Now();
  1669. fNextExecution := fStartDate;
  1670. fEnabled := True;
  1671. end;
  1672. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1673. begin
  1674. if fStartDate = 0.0 then ClearSchedule;
  1675. fScheduleMode := TScheduleMode.smRepeatMode;
  1676. fTimeMeasure := aTimeMeasure;
  1677. fTimeInterval := aInterval;
  1678. if fStartDate < Now() then fStartDate := Now();
  1679. fExpirationDate := aEndTime;
  1680. fNextExecution := fStartDate;
  1681. fEnabled := True;
  1682. end;
  1683. procedure TScheduledTask.RepeatEveryDay;
  1684. begin
  1685. RepeatEvery(1,tmDays);
  1686. end;
  1687. procedure TScheduledTask.RepeatEveryWeek;
  1688. begin
  1689. RepeatEvery(7,tmDays);
  1690. end;
  1691. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1692. begin
  1693. if fStartDate = 0.0 then ClearSchedule;
  1694. fScheduleMode := TScheduleMode.smRepeatMode;
  1695. fTimeMeasure := aTimeMeasure;
  1696. fTimeInterval := aInterval;
  1697. if fStartDate < Now() then fStartDate := Now();
  1698. fExpirationTimes := aRepeatTimes;
  1699. fNextExecution := fStartDate;
  1700. fEnabled := True;
  1701. end;
  1702. procedure TScheduledTask.RunOnce;
  1703. begin
  1704. fScheduleMode := TScheduleMode.smRunOnce;
  1705. if fStartDate < Now() then fStartDate := Now();
  1706. fNextExecution := fStartDate;
  1707. fEnabled := True;
  1708. end;
  1709. procedure TScheduledTask.Cancel;
  1710. begin
  1711. fFinished := True;
  1712. end;
  1713. function TScheduledTask.CheckSchedule: Boolean;
  1714. begin
  1715. Result := False;
  1716. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1717. if fScheduleMode = TScheduleMode.smRunOnce then
  1718. begin
  1719. //if start date reached
  1720. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1721. begin
  1722. fLastExecution := Now();
  1723. Inc(fExecutionTimes);
  1724. fFinished := True;
  1725. Result := True;
  1726. end;
  1727. end
  1728. else
  1729. begin
  1730. //if next execution reached
  1731. if Now() >= fNextExecution then
  1732. begin
  1733. //check expiration limits
  1734. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1735. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1736. begin
  1737. fFinished := True;
  1738. Exit;
  1739. end;
  1740. //calculate next execution
  1741. case fTimeMeasure of
  1742. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1743. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1744. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1745. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1746. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1747. end;
  1748. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1749. else
  1750. begin
  1751. fLastExecution := Now();
  1752. Inc(fExecutionTimes);
  1753. Result := True;
  1754. end;
  1755. end;
  1756. end;
  1757. end;
  1758. procedure TScheduledTask.ClearSchedule;
  1759. begin
  1760. inherited;
  1761. fFinished := False;
  1762. fStartDate := 0.0;
  1763. fLastExecution := 0.0;
  1764. fNextExecution := 0.0;
  1765. fExpirationDate := 0.0;
  1766. fScheduleMode := TScheduleMode.smRunOnce;
  1767. fTimeMeasure := TTimeMeasure.tmSeconds;
  1768. fTimeInterval := 0;
  1769. end;
  1770. procedure TScheduledTask.DoExpire;
  1771. begin
  1772. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1773. fEnabled := False;
  1774. end;
  1775. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1776. begin
  1777. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1778. end;
  1779. function TScheduledTask.GetTaskName: string;
  1780. begin
  1781. Result := fName;
  1782. end;
  1783. function TScheduledTask.IsFinished: Boolean;
  1784. begin
  1785. Result := fFinished;
  1786. end;
  1787. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1788. begin
  1789. fExceptProc := aTaskProc;
  1790. Result := Self;
  1791. end;
  1792. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1793. begin
  1794. Result := OnException(aTaskProc);
  1795. TTask(Result).ExceptionWithSync := True;
  1796. end;
  1797. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1798. begin
  1799. fRetryProc := aTaskProc;
  1800. Result := Self;
  1801. end;
  1802. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1803. begin
  1804. fExpiredProc := aTaskProc;
  1805. Result := Self;
  1806. end;
  1807. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1808. begin
  1809. Result := OnExpired(aTaskProc);
  1810. TScheduledTask(Result).ExpireWithSync := True;
  1811. end;
  1812. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1813. begin
  1814. fInitializeProc := aTaskProc;
  1815. Result := Self;
  1816. end;
  1817. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1818. begin
  1819. fTerminateProc := aTaskProc;
  1820. Result := Self;
  1821. end;
  1822. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1823. begin
  1824. Result := OnTerminated(aTaskProc);
  1825. TTask(Result).TerminateWithSync := True;
  1826. end;
  1827. { TScheduler }
  1828. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1829. begin
  1830. inherited Create(True);
  1831. FreeOnTerminate := False;
  1832. fListLock := TCriticalSection.Create;
  1833. fRemoveTaskAfterExpiration := False;
  1834. fTaskList := aTaskList;
  1835. {$IFDEF FPC}
  1836. fCondVar := TSimpleEvent.Create;
  1837. {$ELSE}
  1838. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1839. {$ENDIF}
  1840. end;
  1841. destructor TScheduler.Destroy;
  1842. begin
  1843. fCondVar.SetEvent;
  1844. fCondVar.Free;
  1845. fTaskList := nil;
  1846. fListLock.Free;
  1847. inherited;
  1848. end;
  1849. procedure TScheduler.Execute;
  1850. var
  1851. task : IScheduledTask;
  1852. worker : TScheduledWorker;
  1853. numworker : Int64;
  1854. begin
  1855. numworker := 0;
  1856. while not Terminated do
  1857. begin
  1858. fListLock.Enter;
  1859. try
  1860. for task in fTaskList do
  1861. begin
  1862. if (task.IsEnabled) and (not task.IsFinished) then
  1863. begin
  1864. if task.CheckSchedule then
  1865. begin
  1866. Inc(numworker);
  1867. worker := TScheduledWorker.Create(numworker,task);
  1868. worker.Start;
  1869. end;
  1870. end
  1871. else
  1872. begin
  1873. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1874. end;
  1875. end;
  1876. task := nil;
  1877. finally
  1878. fListLock.Leave;
  1879. end;
  1880. fCondVar.WaitFor(250);
  1881. end;
  1882. end;
  1883. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1884. begin
  1885. Result := fTaskList.Add(aTask);
  1886. end;
  1887. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1888. var
  1889. task : IScheduledTask;
  1890. begin
  1891. fListLock.Enter;
  1892. try
  1893. for task in fTaskList do
  1894. begin
  1895. if task.IdTask = aIdTask then Exit(task);
  1896. end;
  1897. finally
  1898. fListLock.Leave;
  1899. end;
  1900. end;
  1901. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1902. var
  1903. task : IScheduledTask;
  1904. begin
  1905. fListLock.Enter;
  1906. try
  1907. for task in fTaskList do
  1908. begin
  1909. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1910. end;
  1911. finally
  1912. fListLock.Leave;
  1913. end;
  1914. end;
  1915. { TAdvThread }
  1916. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1917. begin
  1918. inherited Create(True);
  1919. FreeOnTerminate := True;
  1920. fExecuteWithSync := aSynchronize;
  1921. fExecuteProc := aProc;
  1922. end;
  1923. procedure TAdvThread.DoExecute;
  1924. begin
  1925. try
  1926. if Assigned(fExecuteProc) then fExecuteProc;
  1927. except
  1928. on E : Exception do
  1929. begin
  1930. {$IFNDEF FPC}
  1931. {$IF DELPHIRX10_UP}
  1932. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1933. {$ELSE}
  1934. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1935. {$ENDIF}
  1936. {$ELSE}
  1937. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1938. {$ENDIF}
  1939. else raise e;
  1940. end;
  1941. end;
  1942. end;
  1943. procedure TAdvThread.CallToTerminate;
  1944. begin
  1945. if Assigned(fTerminateProc) then fTerminateProc;
  1946. end;
  1947. procedure TAdvThread.DoTerminate;
  1948. begin
  1949. if fTerminateWithSync then Synchronize(CallToTerminate)
  1950. else CallToTerminate;
  1951. end;
  1952. procedure TAdvThread.Execute;
  1953. begin
  1954. if fExecuteWithSync then Synchronize(DoExecute)
  1955. else DoExecute;
  1956. end;
  1957. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1958. begin
  1959. fExceptionProc := aProc;
  1960. end;
  1961. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1962. begin
  1963. fTerminateWithSync := aSynchronize;
  1964. fTerminateProc := aProc;
  1965. end;
  1966. { TRunTask }
  1967. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  1968. begin
  1969. Result := Execute([],False,aTaskProc);
  1970. end;
  1971. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  1972. begin
  1973. Result := Execute_Sync([],False,aTaskProc);
  1974. end;
  1975. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1976. var
  1977. task : TWorkTask;
  1978. worker : TSimpleWorker;
  1979. begin
  1980. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1981. task.ExecuteWithSync := False;
  1982. Result := task;
  1983. worker := TSimpleWorker.Create(task);
  1984. worker.Start;
  1985. end;
  1986. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1987. var
  1988. task : TWorkTask;
  1989. worker : TSimpleWorker;
  1990. begin
  1991. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1992. task.ExecuteWithSync := True;
  1993. Result := task;
  1994. worker := TSimpleWorker.Create(task);
  1995. worker.Start;
  1996. end;
  1997. { TParamValue }
  1998. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  1999. begin
  2000. inherited Create;
  2001. fName := aName;
  2002. fValue := aValue;
  2003. fOwned := aOwnedValue;
  2004. end;
  2005. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2006. begin
  2007. inherited Create;
  2008. fName := aName;
  2009. fValue := aValue;
  2010. fOwned := aOwnedValue;
  2011. end;
  2012. constructor TParamValue.Create;
  2013. begin
  2014. fName := '';
  2015. fOwned := False;
  2016. end;
  2017. destructor TParamValue.Destroy;
  2018. begin
  2019. {$IFDEF FPC}
  2020. fValue._Release;
  2021. {$ENDIF}
  2022. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2023. inherited;
  2024. end;
  2025. end.