Quick.Threads.pas 67 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2021 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 08/02/2021
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TAdvThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fExceptionProc : TAnonExceptionProc;
  137. fTerminateProc : TProc;
  138. fExecuteWithSync : Boolean;
  139. fTerminateWithSync : Boolean;
  140. procedure DoExecute;
  141. procedure CallToTerminate;
  142. protected
  143. procedure DoTerminate; override;
  144. public
  145. constructor Create(aProc : TProc; aSynchronize : Boolean);
  146. procedure OnException(aProc : TAnonExceptionProc);
  147. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  148. procedure Execute; override;
  149. end;
  150. IAnonymousThread = interface
  151. procedure Start;
  152. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  153. function OnTerminate(aProc : TProc) : IAnonymousThread;
  154. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  155. end;
  156. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  157. private
  158. fThread : TAdvThread;
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. public
  161. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  162. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  163. procedure Start;
  164. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  165. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  166. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  167. end;
  168. TParamValue = class
  169. private
  170. fName : string;
  171. fValue : TFlexValue;
  172. fOwned : Boolean;
  173. public
  174. constructor Create; overload;
  175. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  176. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  177. destructor Destroy; override;
  178. property Name : string read fName write fName;
  179. property Value : TFlexValue read fValue write fValue;
  180. property Owned : Boolean read fOwned write fOwned;
  181. end;
  182. TParamList = TObjectList<TParamValue>;
  183. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  184. TScheduleMode = (smRunOnce, smRepeatMode);
  185. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  186. ETaskAddError = class(Exception);
  187. ETaskInitializationError = class(Exception);
  188. ETaskExecutionError = class(Exception);
  189. ETaskParamError = class(Exception);
  190. ETaskSchedulerError = class(Exception);
  191. ITask = interface
  192. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  193. function GetParam(aIndex : Integer) : TFlexValue; overload;
  194. function GetParam(const aName : string) : TFlexValue; overload;
  195. function GetParam2(aIndex : Integer) : PFlexValue;
  196. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  197. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  198. function TaskStatus : TWorkTaskStatus;
  199. function GetNumWorker : Integer;
  200. procedure SetNumWorker(Value : Integer);
  201. function GetIdTask : Int64;
  202. procedure SetIdTask(Value : Int64);
  203. function GetResult : TFlexValue;
  204. procedure SetResult(aValue : TFlexValue);
  205. procedure DoExecute;
  206. procedure DoException(aException : Exception);
  207. procedure DoTerminate;
  208. procedure Enable;
  209. procedure Disable;
  210. {$IFNDEF FPC}
  211. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  212. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  213. {$ELSE}
  214. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  215. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  216. {$ENDIF}
  217. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  218. property Result : TFlexValue read GetResult write SetResult;
  219. property IdTask : Int64 read GetIdTask;
  220. function Done : Boolean;
  221. function Failed : Boolean;
  222. function NumRetries : Integer;
  223. function MaxRetries : Integer;
  224. function LastException : Exception;
  225. function CircuitBreaked : Boolean;
  226. function IsEnabled : Boolean;
  227. end;
  228. {$IFNDEF FPC}
  229. TTaskProc = reference to procedure(task : ITask);
  230. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  231. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  232. {$ELSE}
  233. TTaskProc = procedure(task : ITask) of object;
  234. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  235. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  236. {$ENDIF}
  237. IWorkTask = interface(ITask)
  238. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  239. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  240. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  241. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  242. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  243. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  244. function Retry(aMaxRetries : Integer) : IWorkTask;
  245. function RetryForever : IWorkTask;
  246. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  247. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  248. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  249. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  250. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  251. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  252. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  253. procedure Run;
  254. end;
  255. IScheduledTask = interface(ITask)
  256. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  257. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  258. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  259. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  260. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  261. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  262. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  263. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  264. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  265. function Retry(aMaxRetries : Integer) : IScheduledTask;
  266. function RetryForever : IScheduledTask;
  267. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  268. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  269. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  270. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  271. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  272. function CheckSchedule : Boolean;
  273. procedure DoExpire;
  274. function GetTaskName : string;
  275. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  276. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  277. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  278. function StartOnDayChange : IScheduledTask;
  279. function StartNow : IScheduledTask;
  280. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  281. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  282. procedure RunOnce;
  283. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  284. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  285. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  286. procedure RepeatEveryDay;
  287. procedure RepeatEveryWeek;
  288. function IsFinished : Boolean;
  289. procedure Cancel;
  290. property Name : string read GetTaskName;
  291. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  292. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  293. end;
  294. TTask = class(TInterfacedObject,ITask)
  295. private
  296. fIdTask : Int64;
  297. fNumWorker : Integer;
  298. fNumRetries : Integer;
  299. fParamList : TParamList;
  300. fInitializeProc : TTaskProc;
  301. fExecuteProc : TTaskProc;
  302. fExceptProc : TTaskExceptionProc;
  303. fTerminateProc : TTaskProc;
  304. fExpiredProc : TTaskProc;
  305. fTaskStatus : TWorkTaskStatus;
  306. fOwnedParams : Boolean;
  307. fEnabled : Boolean;
  308. fExecuteWithSync : Boolean;
  309. fExceptionWithSync : Boolean;
  310. fRetryProc : TTaskRetryProc;
  311. fTerminateWithSync : Boolean;
  312. fFaultControl : TFaultControl;
  313. fCustomFaultPolicy : Boolean;
  314. fResult : TFlexValue;
  315. function GetParam(aIndex : Integer) : TFlexValue; overload;
  316. function GetParam(const aName : string) : TFlexValue; overload;
  317. function GetParam2(aIndex : Integer) : PFlexValue;
  318. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  319. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  320. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  321. procedure DoInitialize;
  322. procedure DoExecute;
  323. procedure DoException(aException : Exception);
  324. procedure DoTerminate;
  325. function GetNumWorker : Integer;
  326. procedure SetNumWorker(Value : Integer);
  327. function GetIdTask : Int64;
  328. procedure SetIdTask(Value : Int64);
  329. function GetResult : TFlexValue;
  330. procedure SetResult(aValue : TFlexValue);
  331. protected
  332. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  333. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  334. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  335. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  336. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  337. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  338. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  339. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  340. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  341. public
  342. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  343. destructor Destroy; override;
  344. property IdTask : Int64 read GetIdTask;
  345. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  346. function IsEnabled : Boolean;
  347. function TaskStatus : TWorkTaskStatus;
  348. function Done : Boolean;
  349. function Failed : Boolean;
  350. function NumRetries : Integer;
  351. function MaxRetries : Integer;
  352. function LastException : Exception;
  353. function CircuitBreaked : Boolean;
  354. procedure Disable;
  355. procedure Enable;
  356. end;
  357. TWorkTask = class(TTask,IWorkTask)
  358. public
  359. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  360. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  361. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  362. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  363. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  364. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  365. function Retry(aMaxRetries : Integer) : IWorkTask;
  366. function RetryForever : IWorkTask;
  367. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  368. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  369. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  370. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  371. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  372. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  373. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  374. procedure Run; virtual;
  375. end;
  376. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  377. TScheduledTask = class(TTask,IScheduledTask)
  378. private
  379. fName : string;
  380. fExecutionTimes : Integer;
  381. fScheduleMode : TScheduleMode;
  382. fTimeInterval : Integer;
  383. fTimeMeasure : TTimeMeasure;
  384. fStartDate : TDateTime;
  385. fLastExecution : TDateTime;
  386. fNextExecution : TDateTime;
  387. fExpirationDate : TDateTime;
  388. fExpirationTimes : Integer;
  389. fFinished : Boolean;
  390. fExpireWithSync: Boolean;
  391. procedure ClearSchedule;
  392. function CheckSchedule : Boolean;
  393. procedure DoExpire;
  394. function GetTaskName : string;
  395. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  396. protected
  397. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  398. public
  399. property Name : string read fName write fName;
  400. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  401. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  402. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  403. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  404. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  405. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  406. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  407. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  408. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  409. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  410. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  411. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  412. function StartOnDayChange : IScheduledTask;
  413. function StartNow : IScheduledTask;
  414. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  415. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  416. procedure RunOnce;
  417. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  418. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  419. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  420. procedure RepeatEveryDay;
  421. procedure RepeatEveryWeek;
  422. function Retry(aMaxRetries : Integer) : IScheduledTask;
  423. function RetryForever : IScheduledTask;
  424. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  425. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  426. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  427. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  428. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  429. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  430. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  431. function IsFinished : Boolean;
  432. procedure Cancel;
  433. end;
  434. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  435. TWorker = class(TThread)
  436. protected
  437. fStatus : TWorkerStatus;
  438. fCurrentTask : ITask;
  439. fDefaultFaultPolicy : TFaultPolicy;
  440. procedure ExecuteTask;
  441. procedure TerminateTask;
  442. public
  443. constructor Create;
  444. destructor Destroy; override;
  445. property Status : TWorkerStatus read fStatus;
  446. procedure SetFaultPolicy(aTask : TTask);
  447. procedure Execute; override;
  448. end;
  449. TSimpleWorker = class(TWorker)
  450. private
  451. fRunOnce : Boolean;
  452. public
  453. constructor Create(aTask : ITask; aRunOnce : Boolean = True);
  454. procedure Execute; override;
  455. end;
  456. TQueueWorker = class(TWorker)
  457. private
  458. fCurrentIdTask : Integer;
  459. fNumWorker : Integer;
  460. fTaskQueue : TTaskQueue;
  461. public
  462. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  463. property NumWorker : Integer read fNumWorker;
  464. procedure Execute; override;
  465. end;
  466. TScheduledWorker = class(TWorker)
  467. private
  468. procedure ExpireTask;
  469. public
  470. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  471. procedure Execute; override;
  472. end;
  473. TWorkerPool = TObjectList<TWorker>;
  474. TRunTask = class
  475. public
  476. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  477. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  478. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  479. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  480. end;
  481. TBackgroundTasks = class
  482. private
  483. fMaxQueue : Integer;
  484. fWorkerPool : TWorkerPool;
  485. fConcurrentWorkers : Integer;
  486. fInsertTimeout : Cardinal;
  487. fExtractTimeout : Cardinal;
  488. fTaskQueue : TTaskQueue;
  489. fNumPushedTasks : Int64;
  490. function GetTaskQueue : Cardinal;
  491. public
  492. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  493. destructor Destroy; override;
  494. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  495. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  496. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  497. property TaskQueued : Cardinal read GetTaskQueue;
  498. property NumPushedTasks : Int64 read fNumPushedTasks;
  499. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  500. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  501. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  502. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  503. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  504. procedure Start;
  505. procedure CancelAll;
  506. end;
  507. TScheduledTaskList = TList<IScheduledTask>;
  508. TScheduler = class(TThread)
  509. private
  510. fListLock : TCriticalSection;
  511. fCondVar : TSimpleEvent;
  512. fTaskList : TScheduledTaskList;
  513. fRemoveTaskAfterExpiration : Boolean;
  514. public
  515. constructor Create(aTaskList : TScheduledTaskList);
  516. destructor Destroy; override;
  517. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  518. procedure Execute; override;
  519. function Add(aTask : TScheduledTask) : Integer;
  520. function Get(aIdTask : Int64) : IScheduledTask; overload;
  521. function Get(const aTaskName : string) : IScheduledTask; overload;
  522. end;
  523. TScheduledTasks = class
  524. private
  525. fTaskList : TScheduledTaskList;
  526. fScheduler : TScheduler;
  527. fNumPushedTasks : Int64;
  528. fRemoveTaskAfterExpiration : Boolean;
  529. fIsStarted : Boolean;
  530. fFaultPolicy : TFaultPolicy;
  531. public
  532. constructor Create;
  533. destructor Destroy; override;
  534. property NumPushedTasks : Int64 read fNumPushedTasks;
  535. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  536. property IsStarted : Boolean read fIsStarted;
  537. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  538. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  539. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  540. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  541. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  542. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  543. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  544. procedure Start;
  545. procedure Stop;
  546. end;
  547. TBackgroundWorkers = class
  548. private
  549. fWorkerPool : TWorkerPool;
  550. fConcurrentWorkers : Integer;
  551. fWorkerTask : IWorkTask;
  552. public
  553. constructor Create(aConcurrentWorkers : Integer);
  554. destructor Destroy; override;
  555. property ConcurrentWorkers : Integer read fConcurrentWorkers;
  556. function OnExecute(aWorkerProc : TTaskProc) : IWorkTask;
  557. procedure Start;
  558. procedure Stop;
  559. end;
  560. implementation
  561. { TThreadedQueueCS<T> }
  562. procedure TThreadedQueueCS<T>.Clear;
  563. var
  564. obj : T;
  565. begin
  566. FQueueLock.Enter;
  567. try
  568. for obj in FQueue do
  569. begin
  570. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  571. end;
  572. SetLength(FQueue,0);
  573. finally
  574. FQueueLock.Leave;
  575. end;
  576. {$IFDEF FPC}
  577. FQueueCondVar.SetEvent;
  578. {$ELSE}
  579. FQueueCondVar.ReleaseAll;
  580. {$ENDIF}
  581. end;
  582. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  583. begin
  584. inherited Create;
  585. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  586. SetLength(FQueue, AQueueDepth);
  587. FQueueLock := TCriticalSection.Create;
  588. {$IFDEF FPC}
  589. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  590. {$ELSE}
  591. FQueueCondVar := TConditionVariableCS.Create;
  592. {$ENDIF}
  593. FPushTimeout := PushTimeout;
  594. FPopTimeout := PopTimeout;
  595. end;
  596. destructor TThreadedQueueCS<T>.Destroy;
  597. begin
  598. DoShutDown;
  599. FQueueLock.Free;
  600. FQueueCondVar.Free;
  601. inherited;
  602. end;
  603. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  604. begin
  605. FQueueLock.Enter;
  606. try
  607. SetLength(FQueue, Length(FQueue) + ADelta);
  608. finally
  609. FQueueLock.Leave;
  610. end;
  611. {$IFDEF FPC}
  612. FQueueCondVar.SetEvent;
  613. {$ELSE}
  614. FQueueCondVar.ReleaseAll;
  615. {$ENDIF}
  616. end;
  617. function TThreadedQueueCS<T>.PopItem: T;
  618. var
  619. LQueueSize: Integer;
  620. begin
  621. PopItem(LQueueSize, Result);
  622. end;
  623. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  624. begin
  625. AItem := Default(T);
  626. FQueueLock.Enter;
  627. try
  628. Result := wrSignaled;
  629. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  630. begin
  631. {$IFDEF FPC}
  632. Result := FQueueCondVar.WaitFor(FPopTimeout);
  633. {$ELSE}
  634. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  635. {$ENDIF}
  636. end;
  637. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  638. AItem := FQueue[FQueueOffset];
  639. FQueue[FQueueOffset] := Default(T);
  640. if FQueueSize = Length(FQueue) then
  641. begin
  642. {$IFDEF FPC}
  643. FQueueCondVar.SetEvent;
  644. {$ELSE}
  645. FQueueCondVar.ReleaseAll;
  646. {$ENDIF}
  647. end;
  648. Dec(FQueueSize);
  649. Inc(FQueueOffset);
  650. Inc(FTotalItemsPopped);
  651. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  652. finally
  653. AQueueSize := FQueueSize;
  654. FQueueLock.Leave;
  655. end;
  656. end;
  657. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  658. var
  659. LQueueSize: Integer;
  660. begin
  661. Result := PopItem(LQueueSize, AItem);
  662. end;
  663. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  664. begin
  665. PopItem(AQueueSize, Result);
  666. end;
  667. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  668. var
  669. LQueueSize: Integer;
  670. begin
  671. Result := PushItem(AItem, LQueueSize);
  672. end;
  673. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  674. begin
  675. FQueueLock.Enter;
  676. try
  677. if FQueueSize >= High(FQueue) then
  678. begin
  679. if FQueueSize < 1024 then Grow(FQueueSize)
  680. else Grow(FQueueSize Div 2);
  681. end;
  682. Result := wrSignaled;
  683. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  684. begin
  685. {$IFDEF FPC}
  686. Result := FQueueCondVar.WaitFor(FPushTimeout);
  687. {$ELSE}
  688. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  689. {$ENDIF}
  690. end;
  691. if FShutDown or (Result <> wrSignaled) then Exit;
  692. if FQueueSize = 0 then
  693. begin
  694. {$IFDEF FPC}
  695. FQueueCondVar.SetEvent;
  696. {$ELSE}
  697. FQueueCondVar.ReleaseAll;
  698. {$ENDIF}
  699. end;
  700. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  701. Inc(FQueueSize);
  702. Inc(FTotalItemsPushed);
  703. finally
  704. AQueueSize := FQueueSize;
  705. FQueueLock.Leave;
  706. end;
  707. end;
  708. procedure TThreadedQueueCS<T>.DoShutDown;
  709. begin
  710. FShutDown := True;
  711. {$IFDEF FPC}
  712. FQueueCondVar.SetEvent;
  713. {$ELSE}
  714. FQueueCondVar.ReleaseAll;
  715. {$ENDIF}
  716. end;
  717. { TThreadedQueueList<T> }
  718. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  719. begin
  720. inherited Create;
  721. fQueue := TQueue<T>.Create;
  722. fQueue.Capacity := AQueueDepth;
  723. fQueueSize := 0;
  724. fQueueLock := TCriticalSection.Create;
  725. {$IFDEF FPC}
  726. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  727. {$ELSE}
  728. fQueueCondVar := TConditionVariableCS.Create;
  729. {$ENDIF}
  730. fPushTimeout := PushTimeout;
  731. fPopTimeout := PopTimeout;
  732. end;
  733. destructor TThreadedQueueList<T>.Destroy;
  734. begin
  735. DoShutDown;
  736. fQueueLock.Free;
  737. fQueueCondVar.Free;
  738. fQueue.Free;
  739. inherited;
  740. end;
  741. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  742. begin
  743. fQueueLock.Enter;
  744. try
  745. fQueue.Capacity := fQueue.Capacity + ADelta;
  746. finally
  747. fQueueLock.Leave;
  748. end;
  749. {$IFDEF FPC}
  750. FQueueCondVar.SetEvent;
  751. {$ELSE}
  752. FQueueCondVar.ReleaseAll;
  753. {$ENDIF}
  754. end;
  755. function TThreadedQueueList<T>.PopItem: T;
  756. var
  757. LQueueSize: Integer;
  758. begin
  759. PopItem(LQueueSize, Result);
  760. end;
  761. {$IFDEF FPC}
  762. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  763. //var
  764. //crono : TChronometer;
  765. begin
  766. AItem := Default(T);
  767. //crono := TChronometer.Create(False);
  768. try
  769. Result := wrSignaled;
  770. //writeln('popitem');
  771. //crono.Start;
  772. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  773. begin
  774. //crono.Start;
  775. Result := FQueueCondVar.WaitFor(FPopTimeout);
  776. //crono.Stop;
  777. //writeln('in: ' + crono.ElapsedTime);
  778. //if result = twaitresult.wrError then result := twaitresult.wrError;
  779. end;
  780. //crono.Stop;
  781. //writeln('out: ' + crono.ElapsedTime);
  782. fQueueLock.Enter;
  783. try
  784. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  785. AItem := fQueue.Extract;
  786. Dec(FQueueSize);
  787. Inc(fTotalItemsPopped);
  788. finally
  789. fQueueLock.Leave;
  790. end;
  791. finally
  792. AQueueSize := fQueueSize;
  793. end;
  794. end;
  795. {$ELSE}
  796. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  797. begin
  798. AItem := Default(T);
  799. fQueueLock.Enter;
  800. try
  801. Result := wrSignaled;
  802. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  803. begin
  804. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  805. end;
  806. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  807. AItem := fQueue.Extract;
  808. if fQueueSize = fQueue.Count then
  809. begin
  810. FQueueCondVar.ReleaseAll;
  811. end;
  812. Dec(FQueueSize);
  813. Inc(fTotalItemsPopped);
  814. finally
  815. AQueueSize := fQueueSize;
  816. fQueueLock.Leave;
  817. end;
  818. end;
  819. {$ENDIF}
  820. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  821. var
  822. LQueueSize: Integer;
  823. begin
  824. Result := PopItem(LQueueSize, AItem);
  825. end;
  826. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  827. begin
  828. PopItem(AQueueSize, Result);
  829. end;
  830. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  831. var
  832. LQueueSize: Integer;
  833. begin
  834. Result := PushItem(AItem, LQueueSize);
  835. end;
  836. {$IFDEF FPC}
  837. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  838. begin
  839. FQueueLock.Enter;
  840. try
  841. if FQueueSize >= fQueue.Count then Grow(10);
  842. Result := wrSignaled;
  843. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  844. //begin
  845. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  846. //end;
  847. if fShutDown or (Result <> wrSignaled) then Exit;
  848. //if fQueueSize = 0 then
  849. //begin
  850. // FQueueCondVar.SetEvent;
  851. //end;
  852. fQueue.Enqueue(AItem);
  853. Inc(FQueueSize);
  854. Inc(fTotalItemsPushed);
  855. finally
  856. AQueueSize := fQueueSize;
  857. FQueueLock.Leave;
  858. //FQueueCondVar.SetEvent;
  859. end;
  860. end;
  861. {$ELSE}
  862. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  863. begin
  864. FQueueLock.Enter;
  865. try
  866. Result := wrSignaled;
  867. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  868. //begin
  869. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  870. //end;
  871. if fShutDown or (Result <> wrSignaled) then Exit;
  872. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  873. fQueue.Enqueue(AItem);
  874. Inc(FQueueSize);
  875. Inc(fTotalItemsPushed);
  876. finally
  877. AQueueSize := fQueueSize;
  878. FQueueLock.Leave;
  879. end;
  880. end;
  881. {$ENDIF}
  882. procedure TThreadedQueueList<T>.DoShutDown;
  883. begin
  884. fShutDown := True;
  885. {$IFDEF FPC}
  886. FQueueCondVar.SetEvent;
  887. {$ELSE}
  888. FQueueCondVar.ReleaseAll;
  889. {$ENDIF}
  890. end;
  891. {$IFNDEF FPC}
  892. { TThreadObjectList<T> }
  893. procedure TThreadObjectList<T>.Add(const Item: T);
  894. begin
  895. LockList;
  896. try
  897. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  898. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  899. finally
  900. UnlockList;
  901. end;
  902. end;
  903. procedure TThreadObjectList<T>.Clear;
  904. begin
  905. LockList;
  906. try
  907. fList.Clear;
  908. finally
  909. UnlockList;
  910. end;
  911. end;
  912. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  913. begin
  914. inherited Create;
  915. fLock := TObject.Create;
  916. fList := TObjectList<T>.Create;
  917. fDuplicates := dupIgnore;
  918. end;
  919. destructor TThreadObjectList<T>.Destroy;
  920. begin
  921. LockList;
  922. try
  923. fList.Free;
  924. inherited Destroy;
  925. finally
  926. UnlockList;
  927. fLock.Free;
  928. end;
  929. end;
  930. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  931. begin
  932. LockList;
  933. try
  934. Result := fList[aIndex];
  935. finally
  936. UnlockList;
  937. end;
  938. end;
  939. function TThreadObjectList<T>.LockList: TObjectList<T>;
  940. begin
  941. System.TMonitor.Enter(fLock);
  942. Result := fList;
  943. end;
  944. procedure TThreadObjectList<T>.Remove(const Item: T);
  945. begin
  946. RemoveItem(Item, TDirection.FromBeginning);
  947. end;
  948. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  949. begin
  950. LockList;
  951. try
  952. fList.RemoveItem(Item, Direction);
  953. finally
  954. UnlockList;
  955. end;
  956. end;
  957. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  958. begin
  959. LockList;
  960. try
  961. fList[aIndex] := aValue;
  962. finally
  963. UnlockList;
  964. end;
  965. end;
  966. procedure TThreadObjectList<T>.UnlockList;
  967. begin
  968. System.TMonitor.Exit(fLock);
  969. end;
  970. {$ENDIF}
  971. { TAnonymousThread }
  972. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  973. begin
  974. fThread := TAdvThread.Create(aProc,aSynchronize);
  975. end;
  976. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  977. begin
  978. Result := TAnonymousThread.Create(aProc,False);
  979. end;
  980. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  981. begin
  982. Result := TAnonymousThread.Create(aProc,True);
  983. end;
  984. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  985. begin
  986. Result := Self;
  987. fThread.OnException(aProc);
  988. end;
  989. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  990. begin
  991. Result := Self;
  992. fThread.OnTerminate(aProc,False);
  993. end;
  994. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  995. begin
  996. Result := Self;
  997. fThread.OnTerminate(aProc,True);
  998. end;
  999. procedure TAnonymousThread.Start;
  1000. begin
  1001. fThread.Start;
  1002. end;
  1003. { TTask }
  1004. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  1005. var
  1006. i : Integer;
  1007. begin
  1008. fTaskStatus := TWorkTaskStatus.wtsPending;
  1009. fCustomFaultPolicy := False;
  1010. fNumRetries := 0;
  1011. fExecuteWithSync := False;
  1012. fTerminateWithSync := False;
  1013. fExceptionWithSync := False;
  1014. fFaultControl := TFaultControl.Create;
  1015. fFaultControl.OnRetry := DoRetry;
  1016. fOwnedParams := aOwnedParams;
  1017. fParamList := TParamList.Create(True);
  1018. for i := Low(aParamArray) to High(aParamArray) do
  1019. begin
  1020. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1021. {$IFDEF FPC}
  1022. fParamList[i].Value._AddRef;
  1023. {$ENDIF}
  1024. end;
  1025. fExecuteProc := aTaskProc;
  1026. fEnabled := False;
  1027. end;
  1028. destructor TTask.Destroy;
  1029. begin
  1030. fFaultControl.Free;
  1031. //free passed params
  1032. fParamList.Free;
  1033. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1034. inherited;
  1035. end;
  1036. procedure TTask.Disable;
  1037. begin
  1038. fEnabled := False;
  1039. end;
  1040. procedure TTask.DoException(aException : Exception);
  1041. begin
  1042. fTaskStatus := TWorkTaskStatus.wtsException;
  1043. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1044. else raise aException;
  1045. end;
  1046. procedure TTask.DoExecute;
  1047. begin
  1048. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1049. DoInitialize;
  1050. repeat
  1051. try
  1052. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1053. fTaskStatus := TWorkTaskStatus.wtsDone;
  1054. fFaultControl.SuccessExecution;
  1055. except
  1056. on E : Exception do
  1057. begin
  1058. fTaskStatus := TWorkTaskStatus.wtsException;
  1059. {$IFNDEF FPC}
  1060. {$IF DELPHIRX10_UP}
  1061. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1062. {$ELSE}
  1063. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1064. {$ENDIF}
  1065. {$ELSE}
  1066. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1067. {$ENDIF}
  1068. end;
  1069. end;
  1070. until not fFaultControl.NeedToRetry;
  1071. end;
  1072. procedure TTask.DoInitialize;
  1073. begin
  1074. try
  1075. fFaultControl.Reset;
  1076. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1077. except
  1078. on E : Exception do
  1079. begin
  1080. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1081. end;
  1082. end;
  1083. end;
  1084. function TTask.Done: Boolean;
  1085. begin
  1086. Result := not fFaultControl.TaskFailed;
  1087. end;
  1088. function TTask.Failed: Boolean;
  1089. begin
  1090. Result := fFaultControl.TaskFailed;
  1091. end;
  1092. function TTask.CircuitBreaked: Boolean;
  1093. begin
  1094. Result := fFaultControl.CircuitBreaked;
  1095. end;
  1096. function TTask.LastException: Exception;
  1097. begin
  1098. Result := fFaultControl.LastException;
  1099. end;
  1100. function TTask.MaxRetries: Integer;
  1101. begin
  1102. Result := fFaultControl.MaxRetries;
  1103. end;
  1104. function TTask.NumRetries: Integer;
  1105. begin
  1106. Result := fFaultControl.NumRetries;
  1107. end;
  1108. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1109. begin
  1110. vStopRetries := False;
  1111. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1112. end;
  1113. procedure TTask.DoTerminate;
  1114. begin
  1115. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1116. end;
  1117. procedure TTask.Enable;
  1118. begin
  1119. fEnabled := True;
  1120. end;
  1121. function TTask.GetIdTask: Int64;
  1122. begin
  1123. Result := fIdTask;
  1124. end;
  1125. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1126. begin
  1127. {$IFDEF FPC}
  1128. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1129. {$ENDIF}
  1130. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1131. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1132. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1133. end;
  1134. procedure TTask.SetIdTask(Value : Int64);
  1135. begin
  1136. fIdTask := Value;
  1137. end;
  1138. function TTask.GetNumWorker: Integer;
  1139. begin
  1140. Result := fNumWorker;
  1141. end;
  1142. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1143. begin
  1144. Result := fParamList[aIndex].Value;
  1145. end;
  1146. function TTask.GetParam(const aName: string): TFlexValue;
  1147. var
  1148. paramvalue : TParamValue;
  1149. begin
  1150. for paramvalue in fParamList do
  1151. begin
  1152. if CompareText(paramvalue.Name,aName) = 0 then
  1153. begin
  1154. Exit(paramvalue.Value)
  1155. end;
  1156. end;
  1157. //if not exists
  1158. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1159. end;
  1160. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1161. begin
  1162. Result := @fParamList[aIndex].Value;
  1163. end;
  1164. function TTask.GetResult: TFlexValue;
  1165. begin
  1166. Result := fResult;
  1167. end;
  1168. function TTask.IsEnabled: Boolean;
  1169. begin
  1170. Result := fEnabled;
  1171. end;
  1172. procedure TTask.SetNumWorker(Value: Integer);
  1173. begin
  1174. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1175. fNumWorker := Value;
  1176. end;
  1177. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1178. begin
  1179. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1180. fParamList[aIndex].Value := Value;
  1181. end;
  1182. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1183. var
  1184. paramvalue : TParamValue;
  1185. begin
  1186. //check if already exists parameter
  1187. for paramvalue in fParamList do
  1188. begin
  1189. if CompareText(paramvalue.Name,aName) = 0 then
  1190. begin
  1191. paramvalue.Value := Value;
  1192. Exit;
  1193. end;
  1194. end;
  1195. //if not exists, create one
  1196. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1197. end;
  1198. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1199. begin
  1200. SetParam(aName,Value,False);
  1201. end;
  1202. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1203. begin
  1204. fFaultControl.MaxRetries := aMaxRetries;
  1205. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1206. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1207. fCustomFaultPolicy := True;
  1208. end;
  1209. procedure TTask.SetResult(aValue: TFlexValue);
  1210. begin
  1211. fResult := aValue;
  1212. end;
  1213. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1214. begin
  1215. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1216. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1217. fFaultControl.WaitTimeMultiplierFactor := 1;
  1218. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1219. fCustomFaultPolicy := True;
  1220. end;
  1221. function TTask.TaskStatus: TWorkTaskStatus;
  1222. begin
  1223. Result := fTaskStatus;
  1224. end;
  1225. { TWorkTask }
  1226. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1227. begin
  1228. fExceptProc := aTaskProc;
  1229. Result := Self;
  1230. end;
  1231. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1232. begin
  1233. fExceptionWithSync := True;
  1234. Result := OnException(aTaskProc);
  1235. end;
  1236. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1237. begin
  1238. fInitializeProc := aTaskProc;
  1239. Result := Self;
  1240. end;
  1241. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1242. begin
  1243. fRetryProc := aTaskProc;
  1244. Result := Self;
  1245. end;
  1246. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1247. begin
  1248. fTerminateProc := aTaskProc;
  1249. Result := Self;
  1250. end;
  1251. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1252. begin
  1253. fTerminateWithSync := True;
  1254. Result := OnTerminated(aTaskProc);
  1255. end;
  1256. procedure TWorkTask.Run;
  1257. begin
  1258. fEnabled := True;
  1259. end;
  1260. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1261. begin
  1262. Result := Self;
  1263. SetParam(aName,aValue);
  1264. end;
  1265. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1266. begin
  1267. Result := Self;
  1268. SetParam(aName,aValue,aOwned);
  1269. end;
  1270. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1271. begin
  1272. Result := Self;
  1273. SetRetryPolicy(aMaxRetries,0,1);
  1274. end;
  1275. function TWorkTask.RetryForever: IWorkTask;
  1276. begin
  1277. Result := Self;
  1278. SetRetryPolicy(-1,0,1);
  1279. end;
  1280. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1281. begin
  1282. Result := Self;
  1283. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1284. end;
  1285. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1286. begin
  1287. Result := Self;
  1288. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1289. end;
  1290. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1291. begin
  1292. Result := Self;
  1293. SetRetryPolicy(aWaitTimeArray);
  1294. end;
  1295. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1296. begin
  1297. Result := Self;
  1298. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1299. end;
  1300. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1301. begin
  1302. Result := Self;
  1303. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1304. end;
  1305. { TBackgroundTasks }
  1306. procedure TBackgroundTasks.CancelAll;
  1307. begin
  1308. fTaskQueue.Clear;
  1309. end;
  1310. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1311. begin
  1312. fMaxQueue := 0;
  1313. fConcurrentWorkers := aConcurrentWorkers;
  1314. fInsertTimeout := INFINITE;
  1315. fExtractTimeout := 5000;
  1316. fNumPushedTasks := 0;
  1317. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1318. end;
  1319. destructor TBackgroundTasks.Destroy;
  1320. begin
  1321. CancelAll;
  1322. fTaskQueue.DoShutDown;
  1323. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1324. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1325. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1326. inherited;
  1327. end;
  1328. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1329. begin
  1330. Result := fTaskQueue.QueueSize;
  1331. end;
  1332. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1333. begin
  1334. Result := AddTask([],False,aTaskProc);
  1335. end;
  1336. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1337. var
  1338. worktask : IWorkTask;
  1339. begin
  1340. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1341. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1342. Inc(fNumPushedTasks);
  1343. worktask.SetIdTask(fNumPushedTasks);
  1344. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1345. begin
  1346. Result := worktask;
  1347. end;
  1348. end;
  1349. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1350. begin
  1351. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1352. TTask(Result).ExecuteWithSync := True;
  1353. end;
  1354. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1355. begin
  1356. Result := AddTask_Sync([],False,aTaskProc);
  1357. end;
  1358. procedure TBackgroundTasks.Start;
  1359. var
  1360. i : Integer;
  1361. worker : TWorker;
  1362. begin
  1363. //create workers
  1364. if fWorkerPool <> nil then fWorkerPool.Free;
  1365. fWorkerPool := TWorkerPool.Create(True);
  1366. for i := 1 to fConcurrentWorkers do
  1367. begin
  1368. worker := TQueueWorker.Create(i,fTaskQueue);
  1369. fWorkerPool.Add(worker);
  1370. worker.Start;
  1371. end;
  1372. end;
  1373. { TWorker }
  1374. constructor TWorker.Create;
  1375. begin
  1376. inherited Create(True);
  1377. fDefaultFaultPolicy := TFaultPolicy.Create;
  1378. fStatus := TWorkerStatus.wsSuspended;
  1379. FreeOnTerminate := False;
  1380. end;
  1381. destructor TWorker.Destroy;
  1382. begin
  1383. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1384. inherited;
  1385. end;
  1386. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1387. begin
  1388. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1389. end;
  1390. procedure TWorker.Execute;
  1391. begin
  1392. end;
  1393. procedure TWorker.ExecuteTask;
  1394. begin
  1395. fCurrentTask.DoExecute;
  1396. end;
  1397. procedure TWorker.TerminateTask;
  1398. begin
  1399. fCurrentTask.DoTerminate;
  1400. end;
  1401. { TSimpleWorker }
  1402. constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
  1403. begin
  1404. inherited Create;
  1405. fRunOnce := aRunOnce;
  1406. fCurrentTask := aTask;
  1407. FreeOnTerminate := True;
  1408. end;
  1409. procedure TSimpleWorker.Execute;
  1410. begin
  1411. fStatus := TWorkerStatus.wsIdle;
  1412. while not Terminated do
  1413. begin
  1414. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1415. try
  1416. fStatus := TWorkerStatus.wsWorking;
  1417. try
  1418. SetFaultPolicy(TTask(fCurrentTask));
  1419. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1420. else fCurrentTask.DoExecute;
  1421. except
  1422. on E : Exception do
  1423. begin
  1424. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1425. else raise ETaskExecutionError.Create(e.Message);
  1426. end;
  1427. end;
  1428. finally
  1429. fStatus := TWorkerStatus.wsIdle;
  1430. if fRunOnce then Terminate;
  1431. try
  1432. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1433. else fCurrentTask.DoTerminate;
  1434. except
  1435. on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1436. end;
  1437. end;
  1438. end;
  1439. fStatus := TWorkerStatus.wsSuspended
  1440. end;
  1441. { TQueueWorker }
  1442. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1443. begin
  1444. inherited Create;
  1445. fNumWorker := aNumWorker;
  1446. fTaskQueue := aTaskQueue;
  1447. end;
  1448. procedure TQueueWorker.Execute;
  1449. begin
  1450. fStatus := TWorkerStatus.wsIdle;
  1451. while not Terminated do
  1452. begin
  1453. fCurrentTask := fTaskQueue.PopItem;
  1454. if fCurrentTask <> nil then
  1455. try
  1456. fStatus := TWorkerStatus.wsWorking;
  1457. try
  1458. fCurrentIdTask := fCurrentTask.GetIdTask;
  1459. SetFaultPolicy(TTask(fCurrentTask));
  1460. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1461. else fCurrentTask.DoExecute;
  1462. except
  1463. on E : Exception do
  1464. begin
  1465. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1466. else raise ETaskExecutionError.Create(e.Message);
  1467. end;
  1468. end;
  1469. finally
  1470. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1471. else fCurrentTask.DoTerminate;
  1472. fStatus := TWorkerStatus.wsIdle;
  1473. end;
  1474. end;
  1475. fStatus := TWorkerStatus.wsSuspended
  1476. end;
  1477. { TScheduledWorker }
  1478. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1479. begin
  1480. inherited Create;
  1481. {$IFNDEF DELPHILINUX}
  1482. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1483. {$ENDIF}
  1484. FreeOnTerminate := True;
  1485. fCurrentTask := aScheduledTask;
  1486. end;
  1487. procedure TScheduledWorker.Execute;
  1488. begin
  1489. fStatus := TWorkerStatus.wsIdle;
  1490. if Assigned(fCurrentTask) then
  1491. begin
  1492. try
  1493. fStatus := TWorkerStatus.wsWorking;
  1494. try
  1495. SetFaultPolicy(TTask(fCurrentTask));
  1496. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1497. else fCurrentTask.DoExecute;
  1498. fStatus := TWorkerStatus.wsIdle;
  1499. except
  1500. on E : Exception do
  1501. begin
  1502. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1503. else raise ETaskExecutionError.Create(e.Message);
  1504. end;
  1505. end;
  1506. finally
  1507. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1508. else fCurrentTask.DoTerminate;
  1509. //check if expired
  1510. if (fCurrentTask as IScheduledTask).IsFinished then
  1511. begin
  1512. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1513. else (fCurrentTask as IScheduledTask).DoExpire;
  1514. end;
  1515. end;
  1516. end;
  1517. fCurrentTask := nil;
  1518. fStatus := TWorkerStatus.wsSuspended;
  1519. end;
  1520. procedure TScheduledWorker.ExpireTask;
  1521. begin
  1522. (fCurrentTask as IScheduledTask).DoExpire;
  1523. end;
  1524. { TScheduledTasks }
  1525. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1526. begin
  1527. Result := AddTask(aTaskName,[],False,aTaskProc);
  1528. end;
  1529. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1530. var
  1531. scheduletask : TScheduledTask;
  1532. begin
  1533. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1534. scheduletask.Name := aTaskName;
  1535. Inc(fNumPushedTasks);
  1536. scheduletask.SetIdTask(fNumPushedTasks);
  1537. fTaskList.Add(scheduletask);
  1538. Result := scheduletask;
  1539. end;
  1540. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1541. begin
  1542. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1543. TTask(Result).ExecuteWithSync := True;
  1544. end;
  1545. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1546. begin
  1547. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1548. end;
  1549. constructor TScheduledTasks.Create;
  1550. begin
  1551. fNumPushedTasks := 0;
  1552. fIsStarted := False;
  1553. fFaultPolicy := TFaultPolicy.Create;
  1554. fTaskList := TScheduledTaskList.Create;
  1555. end;
  1556. destructor TScheduledTasks.Destroy;
  1557. begin
  1558. if Assigned(fScheduler) then
  1559. begin
  1560. fScheduler.Terminate;
  1561. fScheduler.WaitFor;
  1562. fScheduler.Free;
  1563. end;
  1564. if Assigned(fTaskList) then fTaskList.Free;
  1565. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1566. inherited;
  1567. end;
  1568. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1569. begin
  1570. Result := fScheduler.Get(aIdTask);
  1571. end;
  1572. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1573. begin
  1574. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1575. Result := fScheduler.Get(aTaskName);
  1576. end;
  1577. procedure TScheduledTasks.Start;
  1578. begin
  1579. if fIsStarted then Exit;
  1580. if not Assigned(fScheduler) then
  1581. begin
  1582. fScheduler := TScheduler.Create(fTaskList);
  1583. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1584. end;
  1585. fScheduler.Start;
  1586. fIsStarted := True;
  1587. end;
  1588. procedure TScheduledTasks.Stop;
  1589. begin
  1590. if Assigned(fScheduler) then fScheduler.Terminate;
  1591. fIsStarted := False;
  1592. end;
  1593. { TScheduledTask }
  1594. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1595. begin
  1596. Result := Self;
  1597. SetParam(aName,aValue);
  1598. end;
  1599. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1600. begin
  1601. Result := Self;
  1602. SetParam(aName,aValue);
  1603. end;
  1604. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1605. begin
  1606. Result := Self;
  1607. ClearSchedule;
  1608. fScheduleMode := TScheduleMode.smRunOnce;
  1609. fStartDate := aStartDate;
  1610. fNextExecution := aStartDate;
  1611. end;
  1612. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1613. begin
  1614. Result := Self;
  1615. ClearSchedule;
  1616. fScheduleMode := TScheduleMode.smRunOnce;
  1617. fStartDate := IncMinute(Now(),aMinutes);
  1618. fNextExecution := fStartDate;
  1619. end;
  1620. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1621. begin
  1622. Result := Self;
  1623. ClearSchedule;
  1624. fScheduleMode := TScheduleMode.smRunOnce;
  1625. fStartDate := IncSecond(Now(),aSeconds);
  1626. fNextExecution := fStartDate;
  1627. end;
  1628. function TScheduledTask.StartNow: IScheduledTask;
  1629. begin
  1630. Result := Self;
  1631. ClearSchedule;
  1632. fScheduleMode := TScheduleMode.smRunOnce;
  1633. fStartDate := Now();
  1634. fNextExecution := fStartDate;
  1635. end;
  1636. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1637. begin
  1638. Result := Self;
  1639. ClearSchedule;
  1640. fScheduleMode := TScheduleMode.smRunOnce;
  1641. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1642. fNextExecution := fStartDate;
  1643. end;
  1644. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1645. begin
  1646. Result := Self;
  1647. ClearSchedule;
  1648. fScheduleMode := TScheduleMode.smRunOnce;
  1649. fStartDate := ChangeDateOfADay(Now(),aHour,aMinute,aSecond);
  1650. fNextExecution := fStartDate;
  1651. end;
  1652. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1653. begin
  1654. Result := Self;
  1655. ClearSchedule;
  1656. fScheduleMode := TScheduleMode.smRunOnce;
  1657. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1658. fNextExecution := fStartDate;
  1659. end;
  1660. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1661. begin
  1662. Result := Self;
  1663. SetRetryPolicy(aMaxRetries,0,1);
  1664. end;
  1665. function TScheduledTask.RetryForever: IScheduledTask;
  1666. begin
  1667. Result := Self;
  1668. SetRetryPolicy(-1,0,1);
  1669. end;
  1670. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1671. begin
  1672. Result := Self;
  1673. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1674. end;
  1675. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1676. begin
  1677. Result := Self;
  1678. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1679. end;
  1680. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1681. begin
  1682. Result := Self;
  1683. SetRetryPolicy(aWaitTimeArray);
  1684. end;
  1685. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1686. begin
  1687. Result := Self;
  1688. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1689. end;
  1690. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1691. begin
  1692. Result := Self;
  1693. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1694. end;
  1695. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1696. begin
  1697. if fStartDate = 0.0 then ClearSchedule;
  1698. fScheduleMode := TScheduleMode.smRepeatMode;
  1699. fTimeMeasure := aTimeMeasure;
  1700. fTimeInterval := aInterval;
  1701. if fStartDate < Now() then fStartDate := Now();
  1702. fNextExecution := fStartDate;
  1703. fEnabled := True;
  1704. end;
  1705. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1706. begin
  1707. if fStartDate = 0.0 then ClearSchedule;
  1708. fScheduleMode := TScheduleMode.smRepeatMode;
  1709. fTimeMeasure := aTimeMeasure;
  1710. fTimeInterval := aInterval;
  1711. if fStartDate < Now() then fStartDate := Now();
  1712. fExpirationDate := aEndTime;
  1713. fNextExecution := fStartDate;
  1714. fEnabled := True;
  1715. end;
  1716. procedure TScheduledTask.RepeatEveryDay;
  1717. begin
  1718. RepeatEvery(1,tmDays);
  1719. end;
  1720. procedure TScheduledTask.RepeatEveryWeek;
  1721. begin
  1722. RepeatEvery(7,tmDays);
  1723. end;
  1724. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1725. begin
  1726. if fStartDate = 0.0 then ClearSchedule;
  1727. fScheduleMode := TScheduleMode.smRepeatMode;
  1728. fTimeMeasure := aTimeMeasure;
  1729. fTimeInterval := aInterval;
  1730. if fStartDate < Now() then fStartDate := Now();
  1731. fExpirationTimes := aRepeatTimes;
  1732. fNextExecution := fStartDate;
  1733. fEnabled := True;
  1734. end;
  1735. procedure TScheduledTask.RunOnce;
  1736. begin
  1737. fScheduleMode := TScheduleMode.smRunOnce;
  1738. if fStartDate < Now() then fStartDate := Now();
  1739. fNextExecution := fStartDate;
  1740. fEnabled := True;
  1741. end;
  1742. procedure TScheduledTask.Cancel;
  1743. begin
  1744. fFinished := True;
  1745. end;
  1746. function TScheduledTask.CheckSchedule: Boolean;
  1747. begin
  1748. Result := False;
  1749. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1750. if fScheduleMode = TScheduleMode.smRunOnce then
  1751. begin
  1752. //if start date reached
  1753. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1754. begin
  1755. fLastExecution := Now();
  1756. Inc(fExecutionTimes);
  1757. fFinished := True;
  1758. Result := True;
  1759. end;
  1760. end
  1761. else
  1762. begin
  1763. //if next execution reached
  1764. if Now() >= fNextExecution then
  1765. begin
  1766. //check expiration limits
  1767. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1768. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1769. begin
  1770. fFinished := True;
  1771. Exit;
  1772. end;
  1773. //calculate next execution
  1774. case fTimeMeasure of
  1775. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1776. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1777. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1778. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1779. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1780. end;
  1781. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1782. else
  1783. begin
  1784. fLastExecution := Now();
  1785. Inc(fExecutionTimes);
  1786. Result := True;
  1787. end;
  1788. end;
  1789. end;
  1790. end;
  1791. procedure TScheduledTask.ClearSchedule;
  1792. begin
  1793. inherited;
  1794. fFinished := False;
  1795. fStartDate := 0.0;
  1796. fLastExecution := 0.0;
  1797. fNextExecution := 0.0;
  1798. fExpirationDate := 0.0;
  1799. fScheduleMode := TScheduleMode.smRunOnce;
  1800. fTimeMeasure := TTimeMeasure.tmSeconds;
  1801. fTimeInterval := 0;
  1802. end;
  1803. procedure TScheduledTask.DoExpire;
  1804. begin
  1805. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1806. fEnabled := False;
  1807. end;
  1808. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1809. begin
  1810. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1811. end;
  1812. function TScheduledTask.GetTaskName: string;
  1813. begin
  1814. Result := fName;
  1815. end;
  1816. function TScheduledTask.IsFinished: Boolean;
  1817. begin
  1818. Result := fFinished;
  1819. end;
  1820. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1821. begin
  1822. fExceptProc := aTaskProc;
  1823. Result := Self;
  1824. end;
  1825. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1826. begin
  1827. Result := OnException(aTaskProc);
  1828. TTask(Result).ExceptionWithSync := True;
  1829. end;
  1830. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1831. begin
  1832. fRetryProc := aTaskProc;
  1833. Result := Self;
  1834. end;
  1835. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1836. begin
  1837. fExpiredProc := aTaskProc;
  1838. Result := Self;
  1839. end;
  1840. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1841. begin
  1842. Result := OnExpired(aTaskProc);
  1843. TScheduledTask(Result).ExpireWithSync := True;
  1844. end;
  1845. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1846. begin
  1847. fInitializeProc := aTaskProc;
  1848. Result := Self;
  1849. end;
  1850. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1851. begin
  1852. fTerminateProc := aTaskProc;
  1853. Result := Self;
  1854. end;
  1855. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1856. begin
  1857. Result := OnTerminated(aTaskProc);
  1858. TTask(Result).TerminateWithSync := True;
  1859. end;
  1860. { TScheduler }
  1861. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1862. begin
  1863. inherited Create(True);
  1864. FreeOnTerminate := False;
  1865. fListLock := TCriticalSection.Create;
  1866. fRemoveTaskAfterExpiration := False;
  1867. fTaskList := aTaskList;
  1868. {$IFDEF FPC}
  1869. fCondVar := TSimpleEvent.Create;
  1870. {$ELSE}
  1871. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1872. {$ENDIF}
  1873. end;
  1874. destructor TScheduler.Destroy;
  1875. begin
  1876. fCondVar.SetEvent;
  1877. fCondVar.Free;
  1878. fTaskList := nil;
  1879. fListLock.Free;
  1880. inherited;
  1881. end;
  1882. procedure TScheduler.Execute;
  1883. var
  1884. task : IScheduledTask;
  1885. worker : TScheduledWorker;
  1886. numworker : Int64;
  1887. begin
  1888. numworker := 0;
  1889. while not Terminated do
  1890. begin
  1891. fListLock.Enter;
  1892. try
  1893. for task in fTaskList do
  1894. begin
  1895. if (task.IsEnabled) and (not task.IsFinished) then
  1896. begin
  1897. if task.CheckSchedule then
  1898. begin
  1899. Inc(numworker);
  1900. worker := TScheduledWorker.Create(numworker,task);
  1901. worker.Start;
  1902. end;
  1903. end
  1904. else
  1905. begin
  1906. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1907. end;
  1908. end;
  1909. task := nil;
  1910. finally
  1911. fListLock.Leave;
  1912. end;
  1913. fCondVar.WaitFor(250);
  1914. end;
  1915. end;
  1916. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1917. begin
  1918. Result := fTaskList.Add(aTask);
  1919. end;
  1920. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1921. var
  1922. task : IScheduledTask;
  1923. begin
  1924. fListLock.Enter;
  1925. try
  1926. for task in fTaskList do
  1927. begin
  1928. if task.IdTask = aIdTask then Exit(task);
  1929. end;
  1930. finally
  1931. fListLock.Leave;
  1932. end;
  1933. end;
  1934. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1935. var
  1936. task : IScheduledTask;
  1937. begin
  1938. fListLock.Enter;
  1939. try
  1940. for task in fTaskList do
  1941. begin
  1942. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  1943. end;
  1944. finally
  1945. fListLock.Leave;
  1946. end;
  1947. end;
  1948. { TAdvThread }
  1949. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  1950. begin
  1951. inherited Create(True);
  1952. FreeOnTerminate := True;
  1953. fExecuteWithSync := aSynchronize;
  1954. fExecuteProc := aProc;
  1955. end;
  1956. procedure TAdvThread.DoExecute;
  1957. begin
  1958. try
  1959. if Assigned(fExecuteProc) then fExecuteProc;
  1960. except
  1961. on E : Exception do
  1962. begin
  1963. {$IFNDEF FPC}
  1964. {$IF DELPHIRX10_UP}
  1965. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  1966. {$ELSE}
  1967. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1968. {$ENDIF}
  1969. {$ELSE}
  1970. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  1971. {$ENDIF}
  1972. else raise e;
  1973. end;
  1974. end;
  1975. end;
  1976. procedure TAdvThread.CallToTerminate;
  1977. begin
  1978. if Assigned(fTerminateProc) then fTerminateProc;
  1979. end;
  1980. procedure TAdvThread.DoTerminate;
  1981. begin
  1982. if fTerminateWithSync then Synchronize(CallToTerminate)
  1983. else CallToTerminate;
  1984. end;
  1985. procedure TAdvThread.Execute;
  1986. begin
  1987. if fExecuteWithSync then Synchronize(DoExecute)
  1988. else DoExecute;
  1989. end;
  1990. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  1991. begin
  1992. fExceptionProc := aProc;
  1993. end;
  1994. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  1995. begin
  1996. fTerminateWithSync := aSynchronize;
  1997. fTerminateProc := aProc;
  1998. end;
  1999. { TRunTask }
  2000. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  2001. begin
  2002. Result := Execute([],False,aTaskProc);
  2003. end;
  2004. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  2005. begin
  2006. Result := Execute_Sync([],False,aTaskProc);
  2007. end;
  2008. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2009. var
  2010. task : TWorkTask;
  2011. worker : TSimpleWorker;
  2012. begin
  2013. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2014. task.ExecuteWithSync := False;
  2015. Result := task;
  2016. worker := TSimpleWorker.Create(task);
  2017. worker.Start;
  2018. end;
  2019. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2020. var
  2021. task : TWorkTask;
  2022. worker : TSimpleWorker;
  2023. begin
  2024. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2025. task.ExecuteWithSync := True;
  2026. Result := task;
  2027. worker := TSimpleWorker.Create(task);
  2028. worker.Start;
  2029. end;
  2030. { TParamValue }
  2031. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2032. begin
  2033. inherited Create;
  2034. fName := aName;
  2035. fValue := aValue;
  2036. fOwned := aOwnedValue;
  2037. end;
  2038. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2039. begin
  2040. inherited Create;
  2041. fName := aName;
  2042. fValue := aValue;
  2043. fOwned := aOwnedValue;
  2044. end;
  2045. constructor TParamValue.Create;
  2046. begin
  2047. fName := '';
  2048. fOwned := False;
  2049. end;
  2050. destructor TParamValue.Destroy;
  2051. begin
  2052. {$IFDEF FPC}
  2053. fValue._Release;
  2054. {$ENDIF}
  2055. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2056. inherited;
  2057. end;
  2058. { TBackgroundWorkers }
  2059. constructor TBackgroundWorkers.Create(aConcurrentWorkers: Integer);
  2060. begin
  2061. fConcurrentWorkers := aConcurrentWorkers;
  2062. end;
  2063. destructor TBackgroundWorkers.Destroy;
  2064. begin
  2065. fWorkerPool.Free;
  2066. inherited;
  2067. end;
  2068. function TBackgroundWorkers.OnExecute(aWorkerProc: TTaskProc): IWorkTask;
  2069. begin
  2070. // fWorkerTask := TWorkTask.Create([],False,procedure(task : ITask)
  2071. // begin
  2072. // aWorkerProc;
  2073. // end);
  2074. fWorkerTask := TWorkTask.Create([],False,aWorkerProc);
  2075. fWorkerTask.Run;
  2076. Result := fWorkerTask;
  2077. end;
  2078. procedure TBackgroundWorkers.Start;
  2079. var
  2080. i : Integer;
  2081. worker : TWorker;
  2082. begin
  2083. for i := 1 to fConcurrentWorkers do
  2084. begin
  2085. worker := TSimpleWorker.Create(fWorkerTask,False);
  2086. worker.Start;
  2087. end;
  2088. end;
  2089. procedure TBackgroundWorkers.Stop;
  2090. var
  2091. worker : TWorker;
  2092. begin
  2093. for worker in fWorkerPool do
  2094. begin
  2095. worker.Terminate;
  2096. worker.WaitFor;
  2097. fWorkerPool.Remove(worker);
  2098. end;
  2099. end;
  2100. end.