Quick.Threads.pas 74 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2021 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 08/09/2021
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TSimpleThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. fTimeoutFlag : TLightweightEvent;
  137. public
  138. constructor Create(aProc: TProc; aCreateSuspended, aFreeOnTerminate : Boolean);
  139. destructor Destroy; override;
  140. procedure Execute; override;
  141. function WaitFor(const aTimeout : Cardinal) : TWaitResult; overload;
  142. end;
  143. TAdvThread = class(TThread)
  144. private
  145. fExecuteProc : TProc;
  146. fExceptionProc : TAnonExceptionProc;
  147. fTerminateProc : TProc;
  148. fExecuteWithSync : Boolean;
  149. fTerminateWithSync : Boolean;
  150. procedure DoExecute;
  151. procedure CallToTerminate;
  152. protected
  153. procedure DoTerminate; override;
  154. public
  155. constructor Create(aProc : TProc; aSynchronize : Boolean);
  156. procedure OnException(aProc : TAnonExceptionProc);
  157. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  158. procedure Execute; override;
  159. end;
  160. IAnonymousThread = interface
  161. procedure Start;
  162. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  163. function OnTerminate(aProc : TProc) : IAnonymousThread;
  164. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  165. end;
  166. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  167. private
  168. fThread : TAdvThread;
  169. constructor Create(aProc : TProc; aSynchronize : Boolean);
  170. public
  171. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  172. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  173. procedure Start;
  174. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  175. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  176. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  177. end;
  178. TParamValue = class
  179. private
  180. fName : string;
  181. fValue : TFlexValue;
  182. fOwned : Boolean;
  183. public
  184. constructor Create; overload;
  185. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  186. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  187. destructor Destroy; override;
  188. property Name : string read fName write fName;
  189. property Value : TFlexValue read fValue write fValue;
  190. property Owned : Boolean read fOwned write fOwned;
  191. end;
  192. TParamList = TObjectList<TParamValue>;
  193. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  194. TScheduleMode = (smRunOnce, smRepeatMode);
  195. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  196. ETaskAddError = class(Exception);
  197. ETaskInitializationError = class(Exception);
  198. ETaskExecutionError = class(Exception);
  199. ETaskParamError = class(Exception);
  200. ETaskSchedulerError = class(Exception);
  201. ITask = interface
  202. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  203. function GetParam(aIndex : Integer) : TFlexValue; overload;
  204. function GetParam(const aName : string) : TFlexValue; overload;
  205. function GetParam2(aIndex : Integer) : PFlexValue;
  206. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  207. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  208. function TaskStatus : TWorkTaskStatus;
  209. function GetNumWorker : Integer;
  210. procedure SetNumWorker(Value : Integer);
  211. function GetIdTask : Int64;
  212. procedure SetIdTask(Value : Int64);
  213. function GetResult : TFlexValue;
  214. procedure SetResult(aValue : TFlexValue);
  215. procedure DoExecute;
  216. procedure DoException(aException : Exception);
  217. procedure DoTerminate;
  218. procedure Enable;
  219. procedure Disable;
  220. {$IFNDEF FPC}
  221. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  222. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  223. {$ELSE}
  224. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  225. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  226. {$ENDIF}
  227. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  228. property Result : TFlexValue read GetResult write SetResult;
  229. property IdTask : Int64 read GetIdTask;
  230. function Done : Boolean;
  231. function Failed : Boolean;
  232. function NumRetries : Integer;
  233. function MaxRetries : Integer;
  234. function LastException : Exception;
  235. function CircuitBreaked : Boolean;
  236. function IsEnabled : Boolean;
  237. end;
  238. {$IFNDEF FPC}
  239. TTaskProc = reference to procedure(task : ITask);
  240. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  241. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  242. {$ELSE}
  243. TTaskProc = procedure(task : ITask) of object;
  244. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  245. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  246. {$ENDIF}
  247. IWorkTask = interface(ITask)
  248. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  249. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  250. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  251. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  252. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  253. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  254. function Retry(aMaxRetries : Integer) : IWorkTask;
  255. function RetryForever : IWorkTask;
  256. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  257. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  258. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  259. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  260. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  261. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  262. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  263. procedure Run;
  264. end;
  265. IScheduledTask = interface(ITask)
  266. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  267. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  268. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  269. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  270. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  271. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  272. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  273. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  274. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  275. function Retry(aMaxRetries : Integer) : IScheduledTask;
  276. function RetryForever : IScheduledTask;
  277. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  278. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  279. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  280. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  281. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  282. function CheckSchedule : Boolean;
  283. procedure DoExpire;
  284. function GetTaskName : string;
  285. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  286. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  287. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  288. function StartOnDayChange : IScheduledTask;
  289. function StartNow : IScheduledTask;
  290. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  291. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  292. procedure RunOnce;
  293. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  294. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  295. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  296. procedure RepeatEveryDay;
  297. procedure RepeatEveryWeek;
  298. function IsFinished : Boolean;
  299. procedure Cancel;
  300. property Name : string read GetTaskName;
  301. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  302. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  303. end;
  304. TTask = class(TInterfacedObject,ITask)
  305. private
  306. fIdTask : Int64;
  307. fNumWorker : Integer;
  308. fNumRetries : Integer;
  309. fParamList : TParamList;
  310. fInitializeProc : TTaskProc;
  311. fExecuteProc : TTaskProc;
  312. fExceptProc : TTaskExceptionProc;
  313. fTerminateProc : TTaskProc;
  314. fExpiredProc : TTaskProc;
  315. fTaskStatus : TWorkTaskStatus;
  316. fOwnedParams : Boolean;
  317. fEnabled : Boolean;
  318. fExecuteWithSync : Boolean;
  319. fExceptionWithSync : Boolean;
  320. fRetryProc : TTaskRetryProc;
  321. fTerminateWithSync : Boolean;
  322. fFaultControl : TFaultControl;
  323. fCustomFaultPolicy : Boolean;
  324. fResult : TFlexValue;
  325. function GetParam(aIndex : Integer) : TFlexValue; overload;
  326. function GetParam(const aName : string) : TFlexValue; overload;
  327. function GetParam2(aIndex : Integer) : PFlexValue;
  328. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  329. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  330. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  331. procedure DoInitialize;
  332. procedure DoExecute;
  333. procedure DoException(aException : Exception);
  334. procedure DoTerminate;
  335. function GetNumWorker : Integer;
  336. procedure SetNumWorker(Value : Integer);
  337. function GetIdTask : Int64;
  338. procedure SetIdTask(Value : Int64);
  339. function GetResult : TFlexValue;
  340. procedure SetResult(aValue : TFlexValue);
  341. protected
  342. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  343. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  344. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  345. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  346. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  347. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  348. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  349. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  350. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  351. public
  352. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  353. destructor Destroy; override;
  354. property IdTask : Int64 read GetIdTask;
  355. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  356. function IsEnabled : Boolean;
  357. function TaskStatus : TWorkTaskStatus;
  358. function Done : Boolean;
  359. function Failed : Boolean;
  360. function NumRetries : Integer;
  361. function MaxRetries : Integer;
  362. function LastException : Exception;
  363. function CircuitBreaked : Boolean;
  364. procedure Disable;
  365. procedure Enable;
  366. end;
  367. TWorkTask = class(TTask,IWorkTask)
  368. public
  369. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  370. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  371. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  372. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  373. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  374. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  375. function Retry(aMaxRetries : Integer) : IWorkTask;
  376. function RetryForever : IWorkTask;
  377. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  378. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  379. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  380. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  381. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  382. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  383. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  384. procedure Run; virtual;
  385. end;
  386. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  387. TScheduledTask = class(TTask,IScheduledTask)
  388. private
  389. fName : string;
  390. fExecutionTimes : Integer;
  391. fScheduleMode : TScheduleMode;
  392. fTimeInterval : Integer;
  393. fTimeMeasure : TTimeMeasure;
  394. fStartDate : TDateTime;
  395. fLastExecution : TDateTime;
  396. fNextExecution : TDateTime;
  397. fExpirationDate : TDateTime;
  398. fExpirationTimes : Integer;
  399. fFinished : Boolean;
  400. fExpireWithSync: Boolean;
  401. procedure ClearSchedule;
  402. function CheckSchedule : Boolean;
  403. procedure DoExpire;
  404. function GetTaskName : string;
  405. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  406. protected
  407. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  408. public
  409. property Name : string read fName write fName;
  410. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  411. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  412. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  413. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  414. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  415. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  416. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  417. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  418. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  419. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  420. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  421. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  422. function StartOnDayChange : IScheduledTask;
  423. function StartNow : IScheduledTask;
  424. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  425. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  426. procedure RunOnce;
  427. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  428. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  429. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  430. procedure RepeatEveryDay;
  431. procedure RepeatEveryWeek;
  432. function Retry(aMaxRetries : Integer) : IScheduledTask;
  433. function RetryForever : IScheduledTask;
  434. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  435. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  436. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  437. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  438. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  439. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  440. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  441. function IsFinished : Boolean;
  442. procedure Cancel;
  443. end;
  444. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  445. TWorker = class(TThread)
  446. protected
  447. fStatus : TWorkerStatus;
  448. fCurrentTask : ITask;
  449. fDefaultFaultPolicy : TFaultPolicy;
  450. procedure ExecuteTask;
  451. procedure TerminateTask;
  452. public
  453. constructor Create;
  454. destructor Destroy; override;
  455. property Status : TWorkerStatus read fStatus;
  456. procedure SetFaultPolicy(aTask : TTask);
  457. procedure Execute; override;
  458. end;
  459. TSimpleWorker = class(TWorker)
  460. private
  461. fRunOnce : Boolean;
  462. public
  463. constructor Create(aTask : ITask; aRunOnce : Boolean = True);
  464. procedure Execute; override;
  465. end;
  466. TQueueWorker = class(TWorker)
  467. private
  468. fCurrentIdTask : Integer;
  469. fNumWorker : Integer;
  470. fTaskQueue : TTaskQueue;
  471. public
  472. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  473. property NumWorker : Integer read fNumWorker;
  474. procedure Execute; override;
  475. end;
  476. TScheduledWorker = class(TWorker)
  477. private
  478. procedure ExpireTask;
  479. public
  480. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  481. procedure Execute; override;
  482. end;
  483. TWorkerPool = TObjectList<TWorker>;
  484. TRunTask = class
  485. public
  486. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  487. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  488. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  489. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  490. end;
  491. IAsyncTask = interface
  492. ['{90A27D06-6FCD-493C-8AA0-C52C5105ED8B}']
  493. procedure Wait; overload;
  494. procedure Wait(const aTimeout : Cardinal); overload;
  495. end;
  496. TAsyncTask = class(TInterfacedObject,IAsyncTask)
  497. private
  498. fProcess : TSimpleThread;
  499. constructor Create(aAction : TProc);
  500. public
  501. class function Run(const aAction : TProc) : IAsyncTask; virtual;
  502. procedure Wait; overload;
  503. procedure Wait(const aTimeout : Cardinal); overload;
  504. destructor Destroy; override;
  505. end;
  506. IAsyncTask<T> = interface
  507. ['{8529BBD4-B5AD-4674-8E42-3C74F5156A97}']
  508. function Result : T; overload;
  509. function Result(const aTimeout : Cardinal) : T; overload;
  510. end;
  511. TAsyncTask<T> = class(TInterfacedObject,IAsyncTask<T>)
  512. private
  513. fProcess : TSimpleThread;
  514. fTaskResult : T;
  515. fWaitForResult : Boolean;
  516. function Result : T; overload;
  517. function Result(const aTimeout : Cardinal) : T; overload;
  518. constructor Create(aAction : TFunc<T>);
  519. public
  520. class function Run(const aAction : TFunc<T>) : IAsyncTask<T>; virtual;
  521. destructor Destroy; override;
  522. end;
  523. TBackgroundTasks = class
  524. private
  525. fMaxQueue : Integer;
  526. fWorkerPool : TWorkerPool;
  527. fConcurrentWorkers : Integer;
  528. fInsertTimeout : Cardinal;
  529. fExtractTimeout : Cardinal;
  530. fTaskQueue : TTaskQueue;
  531. fNumPushedTasks : Int64;
  532. function GetTaskQueue : Cardinal;
  533. public
  534. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  535. destructor Destroy; override;
  536. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  537. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  538. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  539. property TaskQueued : Cardinal read GetTaskQueue;
  540. property NumPushedTasks : Int64 read fNumPushedTasks;
  541. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  542. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  543. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  544. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  545. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  546. procedure Start;
  547. procedure CancelAll;
  548. end;
  549. TScheduledTaskList = TList<IScheduledTask>;
  550. TScheduler = class(TThread)
  551. private
  552. fListLock : TCriticalSection;
  553. fCondVar : TSimpleEvent;
  554. fTaskList : TScheduledTaskList;
  555. fRemoveTaskAfterExpiration : Boolean;
  556. public
  557. constructor Create(aTaskList : TScheduledTaskList);
  558. destructor Destroy; override;
  559. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  560. procedure Execute; override;
  561. function Add(aTask : TScheduledTask) : Integer;
  562. function Get(aIdTask : Int64) : IScheduledTask; overload;
  563. function Get(const aTaskName : string) : IScheduledTask; overload;
  564. end;
  565. TScheduledTasks = class
  566. private
  567. fTaskList : TScheduledTaskList;
  568. fScheduler : TScheduler;
  569. fNumPushedTasks : Int64;
  570. fRemoveTaskAfterExpiration : Boolean;
  571. fIsStarted : Boolean;
  572. fFaultPolicy : TFaultPolicy;
  573. public
  574. constructor Create;
  575. destructor Destroy; override;
  576. property NumPushedTasks : Int64 read fNumPushedTasks;
  577. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  578. property IsStarted : Boolean read fIsStarted;
  579. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  580. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  581. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  582. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  583. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  584. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  585. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  586. procedure Start;
  587. procedure Stop;
  588. end;
  589. TBackgroundWorkers = class
  590. private
  591. fWorkerPool : TWorkerPool;
  592. fConcurrentWorkers : Integer;
  593. fWorkerInitProc : TTaskProc;
  594. fWorkerExecuteProc : TTaskProc;
  595. fWorkerRetryProc : TTaskRetryProc;
  596. fWorkerExceptionProc : TTaskExceptionProc;
  597. fWorkerTerminateProc : TTaskProc;
  598. fFaultPolicy : TFaultPolicy;
  599. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  600. public
  601. constructor Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  602. destructor Destroy; override;
  603. property ConcurrentWorkers : Integer read fConcurrentWorkers;
  604. function OnInitialize(aTaskProc : TTaskProc) : TBackgroundWorkers;
  605. function OnException(aTaskProc : TTaskExceptionProc) : TBackgroundWorkers;
  606. function OnRetry(aTaskProc : TTaskRetryProc) : TBackgroundWorkers;
  607. function OnTerminated(aTaskProc : TTaskProc) : TBackgroundWorkers;
  608. function Retry(aMaxRetries : Integer) : TBackgroundWorkers;
  609. function RetryForever : TBackgroundWorkers;
  610. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  611. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  612. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  613. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  614. procedure Start;
  615. procedure Stop;
  616. end;
  617. implementation
  618. { TThreadedQueueCS<T> }
  619. procedure TThreadedQueueCS<T>.Clear;
  620. var
  621. obj : T;
  622. begin
  623. FQueueLock.Enter;
  624. try
  625. for obj in FQueue do
  626. begin
  627. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  628. end;
  629. SetLength(FQueue,0);
  630. finally
  631. FQueueLock.Leave;
  632. end;
  633. {$IFDEF FPC}
  634. FQueueCondVar.SetEvent;
  635. {$ELSE}
  636. FQueueCondVar.ReleaseAll;
  637. {$ENDIF}
  638. end;
  639. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  640. begin
  641. inherited Create;
  642. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  643. SetLength(FQueue, AQueueDepth);
  644. FQueueLock := TCriticalSection.Create;
  645. {$IFDEF FPC}
  646. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  647. {$ELSE}
  648. FQueueCondVar := TConditionVariableCS.Create;
  649. {$ENDIF}
  650. FPushTimeout := PushTimeout;
  651. FPopTimeout := PopTimeout;
  652. end;
  653. destructor TThreadedQueueCS<T>.Destroy;
  654. begin
  655. DoShutDown;
  656. FQueueLock.Free;
  657. FQueueCondVar.Free;
  658. inherited;
  659. end;
  660. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  661. begin
  662. FQueueLock.Enter;
  663. try
  664. SetLength(FQueue, Length(FQueue) + ADelta);
  665. finally
  666. FQueueLock.Leave;
  667. end;
  668. {$IFDEF FPC}
  669. FQueueCondVar.SetEvent;
  670. {$ELSE}
  671. FQueueCondVar.ReleaseAll;
  672. {$ENDIF}
  673. end;
  674. function TThreadedQueueCS<T>.PopItem: T;
  675. var
  676. LQueueSize: Integer;
  677. begin
  678. PopItem(LQueueSize, Result);
  679. end;
  680. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  681. begin
  682. AItem := Default(T);
  683. FQueueLock.Enter;
  684. try
  685. Result := wrSignaled;
  686. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  687. begin
  688. {$IFDEF FPC}
  689. Result := FQueueCondVar.WaitFor(FPopTimeout);
  690. {$ELSE}
  691. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  692. {$ENDIF}
  693. end;
  694. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  695. AItem := FQueue[FQueueOffset];
  696. FQueue[FQueueOffset] := Default(T);
  697. if FQueueSize = Length(FQueue) then
  698. begin
  699. {$IFDEF FPC}
  700. FQueueCondVar.SetEvent;
  701. {$ELSE}
  702. FQueueCondVar.ReleaseAll;
  703. {$ENDIF}
  704. end;
  705. Dec(FQueueSize);
  706. Inc(FQueueOffset);
  707. Inc(FTotalItemsPopped);
  708. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  709. finally
  710. AQueueSize := FQueueSize;
  711. FQueueLock.Leave;
  712. end;
  713. end;
  714. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  715. var
  716. LQueueSize: Integer;
  717. begin
  718. Result := PopItem(LQueueSize, AItem);
  719. end;
  720. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  721. begin
  722. PopItem(AQueueSize, Result);
  723. end;
  724. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  725. var
  726. LQueueSize: Integer;
  727. begin
  728. Result := PushItem(AItem, LQueueSize);
  729. end;
  730. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  731. begin
  732. FQueueLock.Enter;
  733. try
  734. if FQueueSize >= High(FQueue) then
  735. begin
  736. if FQueueSize < 1024 then Grow(FQueueSize)
  737. else Grow(FQueueSize Div 2);
  738. end;
  739. Result := wrSignaled;
  740. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  741. begin
  742. {$IFDEF FPC}
  743. Result := FQueueCondVar.WaitFor(FPushTimeout);
  744. {$ELSE}
  745. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  746. {$ENDIF}
  747. end;
  748. if FShutDown or (Result <> wrSignaled) then Exit;
  749. if FQueueSize = 0 then
  750. begin
  751. {$IFDEF FPC}
  752. FQueueCondVar.SetEvent;
  753. {$ELSE}
  754. FQueueCondVar.ReleaseAll;
  755. {$ENDIF}
  756. end;
  757. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  758. Inc(FQueueSize);
  759. Inc(FTotalItemsPushed);
  760. finally
  761. AQueueSize := FQueueSize;
  762. FQueueLock.Leave;
  763. end;
  764. end;
  765. procedure TThreadedQueueCS<T>.DoShutDown;
  766. begin
  767. FShutDown := True;
  768. {$IFDEF FPC}
  769. FQueueCondVar.SetEvent;
  770. {$ELSE}
  771. FQueueCondVar.ReleaseAll;
  772. {$ENDIF}
  773. end;
  774. { TThreadedQueueList<T> }
  775. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  776. begin
  777. inherited Create;
  778. fQueue := TQueue<T>.Create;
  779. fQueue.Capacity := AQueueDepth;
  780. fQueueSize := 0;
  781. fQueueLock := TCriticalSection.Create;
  782. {$IFDEF FPC}
  783. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  784. {$ELSE}
  785. fQueueCondVar := TConditionVariableCS.Create;
  786. {$ENDIF}
  787. fPushTimeout := PushTimeout;
  788. fPopTimeout := PopTimeout;
  789. end;
  790. destructor TThreadedQueueList<T>.Destroy;
  791. begin
  792. DoShutDown;
  793. fQueueCondVar.Free;
  794. fQueueLock.Free;
  795. //fQueueCondVar.Free;
  796. fQueue.Free;
  797. inherited;
  798. end;
  799. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  800. begin
  801. fQueueLock.Enter;
  802. try
  803. fQueue.Capacity := fQueue.Capacity + ADelta;
  804. finally
  805. fQueueLock.Leave;
  806. end;
  807. {$IFDEF FPC}
  808. FQueueCondVar.SetEvent;
  809. {$ELSE}
  810. FQueueCondVar.ReleaseAll;
  811. {$ENDIF}
  812. end;
  813. function TThreadedQueueList<T>.PopItem: T;
  814. var
  815. LQueueSize: Integer;
  816. begin
  817. PopItem(LQueueSize, Result);
  818. end;
  819. {$IFDEF FPC}
  820. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  821. //var
  822. //crono : TChronometer;
  823. begin
  824. AItem := Default(T);
  825. //crono := TChronometer.Create(False);
  826. try
  827. Result := wrSignaled;
  828. //writeln('popitem');
  829. //crono.Start;
  830. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  831. begin
  832. //crono.Start;
  833. Result := FQueueCondVar.WaitFor(FPopTimeout);
  834. //crono.Stop;
  835. //writeln('in: ' + crono.ElapsedTime);
  836. //if result = twaitresult.wrError then result := twaitresult.wrError;
  837. end;
  838. //crono.Stop;
  839. //writeln('out: ' + crono.ElapsedTime);
  840. fQueueLock.Enter;
  841. try
  842. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  843. AItem := fQueue.Extract;
  844. Dec(FQueueSize);
  845. Inc(fTotalItemsPopped);
  846. finally
  847. fQueueLock.Leave;
  848. end;
  849. finally
  850. AQueueSize := fQueueSize;
  851. end;
  852. end;
  853. {$ELSE}
  854. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  855. begin
  856. AItem := Default(T);
  857. fQueueLock.Enter;
  858. try
  859. Result := wrSignaled;
  860. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  861. begin
  862. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  863. end;
  864. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  865. AItem := fQueue.Extract;
  866. if fQueueSize = fQueue.Count then
  867. begin
  868. FQueueCondVar.ReleaseAll;
  869. end;
  870. Dec(FQueueSize);
  871. Inc(fTotalItemsPopped);
  872. finally
  873. AQueueSize := fQueueSize;
  874. fQueueLock.Leave;
  875. end;
  876. end;
  877. {$ENDIF}
  878. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  879. var
  880. LQueueSize: Integer;
  881. begin
  882. Result := PopItem(LQueueSize, AItem);
  883. end;
  884. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  885. begin
  886. PopItem(AQueueSize, Result);
  887. end;
  888. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  889. var
  890. LQueueSize: Integer;
  891. begin
  892. Result := PushItem(AItem, LQueueSize);
  893. end;
  894. {$IFDEF FPC}
  895. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  896. begin
  897. FQueueLock.Enter;
  898. try
  899. if FQueueSize >= fQueue.Count then Grow(10);
  900. Result := wrSignaled;
  901. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  902. //begin
  903. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  904. //end;
  905. if fShutDown or (Result <> wrSignaled) then Exit;
  906. //if fQueueSize = 0 then
  907. //begin
  908. // FQueueCondVar.SetEvent;
  909. //end;
  910. fQueue.Enqueue(AItem);
  911. Inc(FQueueSize);
  912. Inc(fTotalItemsPushed);
  913. finally
  914. AQueueSize := fQueueSize;
  915. FQueueLock.Leave;
  916. //FQueueCondVar.SetEvent;
  917. end;
  918. end;
  919. {$ELSE}
  920. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  921. begin
  922. FQueueLock.Enter;
  923. try
  924. Result := wrSignaled;
  925. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  926. //begin
  927. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  928. //end;
  929. if fShutDown or (Result <> wrSignaled) then Exit;
  930. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  931. fQueue.Enqueue(AItem);
  932. Inc(FQueueSize);
  933. Inc(fTotalItemsPushed);
  934. finally
  935. AQueueSize := fQueueSize;
  936. FQueueLock.Leave;
  937. end;
  938. end;
  939. {$ENDIF}
  940. procedure TThreadedQueueList<T>.DoShutDown;
  941. begin
  942. fShutDown := True;
  943. {$IFDEF FPC}
  944. FQueueCondVar.SetEvent;
  945. {$ELSE}
  946. FQueueCondVar.ReleaseAll;
  947. {$ENDIF}
  948. end;
  949. {$IFNDEF FPC}
  950. { TThreadObjectList<T> }
  951. procedure TThreadObjectList<T>.Add(const Item: T);
  952. begin
  953. LockList;
  954. try
  955. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  956. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  957. finally
  958. UnlockList;
  959. end;
  960. end;
  961. procedure TThreadObjectList<T>.Clear;
  962. begin
  963. LockList;
  964. try
  965. fList.Clear;
  966. finally
  967. UnlockList;
  968. end;
  969. end;
  970. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  971. begin
  972. inherited Create;
  973. fLock := TObject.Create;
  974. fList := TObjectList<T>.Create;
  975. fDuplicates := dupIgnore;
  976. end;
  977. destructor TThreadObjectList<T>.Destroy;
  978. begin
  979. LockList;
  980. try
  981. fList.Free;
  982. inherited Destroy;
  983. finally
  984. UnlockList;
  985. fLock.Free;
  986. end;
  987. end;
  988. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  989. begin
  990. LockList;
  991. try
  992. Result := fList[aIndex];
  993. finally
  994. UnlockList;
  995. end;
  996. end;
  997. function TThreadObjectList<T>.LockList: TObjectList<T>;
  998. begin
  999. System.TMonitor.Enter(fLock);
  1000. Result := fList;
  1001. end;
  1002. procedure TThreadObjectList<T>.Remove(const Item: T);
  1003. begin
  1004. RemoveItem(Item, TDirection.FromBeginning);
  1005. end;
  1006. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  1007. begin
  1008. LockList;
  1009. try
  1010. fList.RemoveItem(Item, Direction);
  1011. finally
  1012. UnlockList;
  1013. end;
  1014. end;
  1015. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  1016. begin
  1017. LockList;
  1018. try
  1019. fList[aIndex] := aValue;
  1020. finally
  1021. UnlockList;
  1022. end;
  1023. end;
  1024. procedure TThreadObjectList<T>.UnlockList;
  1025. begin
  1026. System.TMonitor.Exit(fLock);
  1027. end;
  1028. {$ENDIF}
  1029. { TAnonymousThread }
  1030. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  1031. begin
  1032. fThread := TAdvThread.Create(aProc,aSynchronize);
  1033. end;
  1034. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  1035. begin
  1036. Result := TAnonymousThread.Create(aProc,False);
  1037. end;
  1038. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  1039. begin
  1040. Result := TAnonymousThread.Create(aProc,True);
  1041. end;
  1042. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  1043. begin
  1044. Result := Self;
  1045. fThread.OnException(aProc);
  1046. end;
  1047. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  1048. begin
  1049. Result := Self;
  1050. fThread.OnTerminate(aProc,False);
  1051. end;
  1052. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  1053. begin
  1054. Result := Self;
  1055. fThread.OnTerminate(aProc,True);
  1056. end;
  1057. procedure TAnonymousThread.Start;
  1058. begin
  1059. fThread.Start;
  1060. end;
  1061. { TTask }
  1062. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  1063. var
  1064. i : Integer;
  1065. begin
  1066. fTaskStatus := TWorkTaskStatus.wtsPending;
  1067. fCustomFaultPolicy := False;
  1068. fNumRetries := 0;
  1069. fExecuteWithSync := False;
  1070. fTerminateWithSync := False;
  1071. fExceptionWithSync := False;
  1072. fFaultControl := TFaultControl.Create;
  1073. fFaultControl.OnRetry := DoRetry;
  1074. fOwnedParams := aOwnedParams;
  1075. fParamList := TParamList.Create(True);
  1076. for i := Low(aParamArray) to High(aParamArray) do
  1077. begin
  1078. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1079. {$IFDEF FPC}
  1080. fParamList[i].Value._AddRef;
  1081. {$ENDIF}
  1082. end;
  1083. fExecuteProc := aTaskProc;
  1084. fEnabled := False;
  1085. end;
  1086. destructor TTask.Destroy;
  1087. begin
  1088. fFaultControl.Free;
  1089. //free passed params
  1090. fParamList.Free;
  1091. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1092. inherited;
  1093. end;
  1094. procedure TTask.Disable;
  1095. begin
  1096. fEnabled := False;
  1097. end;
  1098. procedure TTask.DoException(aException : Exception);
  1099. begin
  1100. fTaskStatus := TWorkTaskStatus.wtsException;
  1101. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1102. else raise aException;
  1103. end;
  1104. procedure TTask.DoExecute;
  1105. begin
  1106. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1107. DoInitialize;
  1108. repeat
  1109. try
  1110. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1111. fTaskStatus := TWorkTaskStatus.wtsDone;
  1112. fFaultControl.SuccessExecution;
  1113. except
  1114. on E : Exception do
  1115. begin
  1116. fTaskStatus := TWorkTaskStatus.wtsException;
  1117. {$IFNDEF FPC}
  1118. {$IF DELPHIRX10_UP}
  1119. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1120. {$ELSE}
  1121. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1122. {$ENDIF}
  1123. {$ELSE}
  1124. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1125. {$ENDIF}
  1126. end;
  1127. end;
  1128. until not fFaultControl.NeedToRetry;
  1129. end;
  1130. procedure TTask.DoInitialize;
  1131. begin
  1132. try
  1133. fFaultControl.Reset;
  1134. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1135. except
  1136. on E : Exception do
  1137. begin
  1138. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1139. end;
  1140. end;
  1141. end;
  1142. function TTask.Done: Boolean;
  1143. begin
  1144. Result := not fFaultControl.TaskFailed;
  1145. end;
  1146. function TTask.Failed: Boolean;
  1147. begin
  1148. Result := fFaultControl.TaskFailed;
  1149. end;
  1150. function TTask.CircuitBreaked: Boolean;
  1151. begin
  1152. Result := fFaultControl.CircuitBreaked;
  1153. end;
  1154. function TTask.LastException: Exception;
  1155. begin
  1156. Result := fFaultControl.LastException;
  1157. end;
  1158. function TTask.MaxRetries: Integer;
  1159. begin
  1160. Result := fFaultControl.MaxRetries;
  1161. end;
  1162. function TTask.NumRetries: Integer;
  1163. begin
  1164. Result := fFaultControl.NumRetries;
  1165. end;
  1166. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1167. begin
  1168. vStopRetries := False;
  1169. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1170. end;
  1171. procedure TTask.DoTerminate;
  1172. begin
  1173. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1174. end;
  1175. procedure TTask.Enable;
  1176. begin
  1177. fEnabled := True;
  1178. end;
  1179. function TTask.GetIdTask: Int64;
  1180. begin
  1181. Result := fIdTask;
  1182. end;
  1183. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1184. begin
  1185. {$IFDEF FPC}
  1186. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1187. {$ENDIF}
  1188. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1189. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1190. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1191. end;
  1192. procedure TTask.SetIdTask(Value : Int64);
  1193. begin
  1194. fIdTask := Value;
  1195. end;
  1196. function TTask.GetNumWorker: Integer;
  1197. begin
  1198. Result := fNumWorker;
  1199. end;
  1200. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1201. begin
  1202. Result := fParamList[aIndex].Value;
  1203. end;
  1204. function TTask.GetParam(const aName: string): TFlexValue;
  1205. var
  1206. paramvalue : TParamValue;
  1207. begin
  1208. for paramvalue in fParamList do
  1209. begin
  1210. if CompareText(paramvalue.Name,aName) = 0 then
  1211. begin
  1212. Exit(paramvalue.Value)
  1213. end;
  1214. end;
  1215. //if not exists
  1216. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1217. end;
  1218. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1219. begin
  1220. Result := @fParamList[aIndex].Value;
  1221. end;
  1222. function TTask.GetResult: TFlexValue;
  1223. begin
  1224. Result := fResult;
  1225. end;
  1226. function TTask.IsEnabled: Boolean;
  1227. begin
  1228. Result := fEnabled;
  1229. end;
  1230. procedure TTask.SetNumWorker(Value: Integer);
  1231. begin
  1232. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1233. fNumWorker := Value;
  1234. end;
  1235. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1236. begin
  1237. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1238. fParamList[aIndex].Value := Value;
  1239. end;
  1240. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1241. var
  1242. paramvalue : TParamValue;
  1243. begin
  1244. //check if already exists parameter
  1245. for paramvalue in fParamList do
  1246. begin
  1247. if CompareText(paramvalue.Name,aName) = 0 then
  1248. begin
  1249. paramvalue.Value := Value;
  1250. Exit;
  1251. end;
  1252. end;
  1253. //if not exists, create one
  1254. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1255. end;
  1256. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1257. begin
  1258. SetParam(aName,Value,False);
  1259. end;
  1260. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1261. begin
  1262. fFaultControl.MaxRetries := aMaxRetries;
  1263. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1264. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1265. fCustomFaultPolicy := True;
  1266. end;
  1267. procedure TTask.SetResult(aValue: TFlexValue);
  1268. begin
  1269. fResult := aValue;
  1270. end;
  1271. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1272. begin
  1273. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1274. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1275. fFaultControl.WaitTimeMultiplierFactor := 1;
  1276. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1277. fCustomFaultPolicy := True;
  1278. end;
  1279. function TTask.TaskStatus: TWorkTaskStatus;
  1280. begin
  1281. Result := fTaskStatus;
  1282. end;
  1283. { TWorkTask }
  1284. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1285. begin
  1286. fExceptProc := aTaskProc;
  1287. Result := Self;
  1288. end;
  1289. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1290. begin
  1291. fExceptionWithSync := True;
  1292. Result := OnException(aTaskProc);
  1293. end;
  1294. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1295. begin
  1296. fInitializeProc := aTaskProc;
  1297. Result := Self;
  1298. end;
  1299. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1300. begin
  1301. fRetryProc := aTaskProc;
  1302. Result := Self;
  1303. end;
  1304. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1305. begin
  1306. fTerminateProc := aTaskProc;
  1307. Result := Self;
  1308. end;
  1309. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1310. begin
  1311. fTerminateWithSync := True;
  1312. Result := OnTerminated(aTaskProc);
  1313. end;
  1314. procedure TWorkTask.Run;
  1315. begin
  1316. fEnabled := True;
  1317. end;
  1318. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1319. begin
  1320. Result := Self;
  1321. SetParam(aName,aValue);
  1322. end;
  1323. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1324. begin
  1325. Result := Self;
  1326. SetParam(aName,aValue,aOwned);
  1327. end;
  1328. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1329. begin
  1330. Result := Self;
  1331. SetRetryPolicy(aMaxRetries,0,1);
  1332. end;
  1333. function TWorkTask.RetryForever: IWorkTask;
  1334. begin
  1335. Result := Self;
  1336. SetRetryPolicy(-1,0,1);
  1337. end;
  1338. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1339. begin
  1340. Result := Self;
  1341. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1342. end;
  1343. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1344. begin
  1345. Result := Self;
  1346. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1347. end;
  1348. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1349. begin
  1350. Result := Self;
  1351. SetRetryPolicy(aWaitTimeArray);
  1352. end;
  1353. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1354. begin
  1355. Result := Self;
  1356. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1357. end;
  1358. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1359. begin
  1360. Result := Self;
  1361. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1362. end;
  1363. { TBackgroundTasks }
  1364. procedure TBackgroundTasks.CancelAll;
  1365. begin
  1366. fTaskQueue.Clear;
  1367. end;
  1368. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1369. begin
  1370. fMaxQueue := 0;
  1371. fConcurrentWorkers := aConcurrentWorkers;
  1372. fInsertTimeout := INFINITE;
  1373. fExtractTimeout := 5000;
  1374. fNumPushedTasks := 0;
  1375. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1376. end;
  1377. destructor TBackgroundTasks.Destroy;
  1378. begin
  1379. CancelAll;
  1380. fTaskQueue.DoShutDown;
  1381. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1382. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1383. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1384. inherited;
  1385. end;
  1386. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1387. begin
  1388. Result := fTaskQueue.QueueSize;
  1389. end;
  1390. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1391. begin
  1392. Result := AddTask([],False,aTaskProc);
  1393. end;
  1394. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1395. var
  1396. worktask : IWorkTask;
  1397. begin
  1398. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1399. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1400. Inc(fNumPushedTasks);
  1401. worktask.SetIdTask(fNumPushedTasks);
  1402. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1403. begin
  1404. Result := worktask;
  1405. end;
  1406. end;
  1407. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1408. begin
  1409. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1410. TTask(Result).ExecuteWithSync := True;
  1411. end;
  1412. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1413. begin
  1414. Result := AddTask_Sync([],False,aTaskProc);
  1415. end;
  1416. procedure TBackgroundTasks.Start;
  1417. var
  1418. i : Integer;
  1419. worker : TWorker;
  1420. begin
  1421. //create workers
  1422. if fWorkerPool <> nil then fWorkerPool.Free;
  1423. fWorkerPool := TWorkerPool.Create(True);
  1424. for i := 1 to fConcurrentWorkers do
  1425. begin
  1426. worker := TQueueWorker.Create(i,fTaskQueue);
  1427. fWorkerPool.Add(worker);
  1428. worker.Start;
  1429. end;
  1430. end;
  1431. { TWorker }
  1432. constructor TWorker.Create;
  1433. begin
  1434. inherited Create(True);
  1435. fDefaultFaultPolicy := TFaultPolicy.Create;
  1436. fStatus := TWorkerStatus.wsSuspended;
  1437. FreeOnTerminate := False;
  1438. end;
  1439. destructor TWorker.Destroy;
  1440. begin
  1441. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1442. inherited;
  1443. end;
  1444. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1445. begin
  1446. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1447. end;
  1448. procedure TWorker.Execute;
  1449. begin
  1450. end;
  1451. procedure TWorker.ExecuteTask;
  1452. begin
  1453. fCurrentTask.DoExecute;
  1454. end;
  1455. procedure TWorker.TerminateTask;
  1456. begin
  1457. fCurrentTask.DoTerminate;
  1458. end;
  1459. { TSimpleWorker }
  1460. constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
  1461. begin
  1462. inherited Create;
  1463. fRunOnce := aRunOnce;
  1464. fCurrentTask := aTask;
  1465. FreeOnTerminate := True;
  1466. end;
  1467. procedure TSimpleWorker.Execute;
  1468. begin
  1469. fStatus := TWorkerStatus.wsIdle;
  1470. while not Terminated do
  1471. begin
  1472. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1473. try
  1474. fStatus := TWorkerStatus.wsWorking;
  1475. try
  1476. SetFaultPolicy(TTask(fCurrentTask));
  1477. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1478. else fCurrentTask.DoExecute;
  1479. except
  1480. on E : Exception do
  1481. begin
  1482. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1483. else raise ETaskExecutionError.Create(e.Message);
  1484. end;
  1485. end;
  1486. finally
  1487. fStatus := TWorkerStatus.wsIdle;
  1488. try
  1489. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1490. else fCurrentTask.DoTerminate;
  1491. except
  1492. on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1493. end;
  1494. if fRunOnce then Terminate;
  1495. end;
  1496. end;
  1497. fStatus := TWorkerStatus.wsSuspended
  1498. end;
  1499. { TQueueWorker }
  1500. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1501. begin
  1502. inherited Create;
  1503. fNumWorker := aNumWorker;
  1504. fTaskQueue := aTaskQueue;
  1505. end;
  1506. procedure TQueueWorker.Execute;
  1507. begin
  1508. fStatus := TWorkerStatus.wsIdle;
  1509. while not Terminated do
  1510. begin
  1511. fCurrentTask := fTaskQueue.PopItem;
  1512. if fCurrentTask <> nil then
  1513. try
  1514. fStatus := TWorkerStatus.wsWorking;
  1515. try
  1516. fCurrentIdTask := fCurrentTask.GetIdTask;
  1517. SetFaultPolicy(TTask(fCurrentTask));
  1518. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1519. else fCurrentTask.DoExecute;
  1520. except
  1521. on E : Exception do
  1522. begin
  1523. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1524. else raise ETaskExecutionError.Create(e.Message);
  1525. end;
  1526. end;
  1527. finally
  1528. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1529. else fCurrentTask.DoTerminate;
  1530. fStatus := TWorkerStatus.wsIdle;
  1531. end;
  1532. end;
  1533. fStatus := TWorkerStatus.wsSuspended
  1534. end;
  1535. { TScheduledWorker }
  1536. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1537. begin
  1538. inherited Create;
  1539. {$IFNDEF DELPHILINUX}
  1540. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1541. {$ENDIF}
  1542. FreeOnTerminate := True;
  1543. fCurrentTask := aScheduledTask;
  1544. end;
  1545. procedure TScheduledWorker.Execute;
  1546. begin
  1547. fStatus := TWorkerStatus.wsIdle;
  1548. if Assigned(fCurrentTask) then
  1549. begin
  1550. try
  1551. fStatus := TWorkerStatus.wsWorking;
  1552. try
  1553. SetFaultPolicy(TTask(fCurrentTask));
  1554. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1555. else fCurrentTask.DoExecute;
  1556. fStatus := TWorkerStatus.wsIdle;
  1557. except
  1558. on E : Exception do
  1559. begin
  1560. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1561. else raise ETaskExecutionError.Create(e.Message);
  1562. end;
  1563. end;
  1564. finally
  1565. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1566. else fCurrentTask.DoTerminate;
  1567. //check if expired
  1568. if (fCurrentTask as IScheduledTask).IsFinished then
  1569. begin
  1570. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1571. else (fCurrentTask as IScheduledTask).DoExpire;
  1572. end;
  1573. end;
  1574. end;
  1575. fCurrentTask := nil;
  1576. fStatus := TWorkerStatus.wsSuspended;
  1577. end;
  1578. procedure TScheduledWorker.ExpireTask;
  1579. begin
  1580. (fCurrentTask as IScheduledTask).DoExpire;
  1581. end;
  1582. { TScheduledTasks }
  1583. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1584. begin
  1585. Result := AddTask(aTaskName,[],False,aTaskProc);
  1586. end;
  1587. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1588. var
  1589. scheduletask : TScheduledTask;
  1590. begin
  1591. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1592. scheduletask.Name := aTaskName;
  1593. Inc(fNumPushedTasks);
  1594. scheduletask.SetIdTask(fNumPushedTasks);
  1595. fTaskList.Add(scheduletask);
  1596. Result := scheduletask;
  1597. end;
  1598. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1599. begin
  1600. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1601. TTask(Result).ExecuteWithSync := True;
  1602. end;
  1603. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1604. begin
  1605. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1606. end;
  1607. constructor TScheduledTasks.Create;
  1608. begin
  1609. fNumPushedTasks := 0;
  1610. fIsStarted := False;
  1611. fFaultPolicy := TFaultPolicy.Create;
  1612. fTaskList := TScheduledTaskList.Create;
  1613. end;
  1614. destructor TScheduledTasks.Destroy;
  1615. begin
  1616. if Assigned(fScheduler) then
  1617. begin
  1618. fScheduler.Terminate;
  1619. fScheduler.WaitFor;
  1620. fScheduler.Free;
  1621. end;
  1622. if Assigned(fTaskList) then fTaskList.Free;
  1623. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1624. inherited;
  1625. end;
  1626. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1627. begin
  1628. Result := fScheduler.Get(aIdTask);
  1629. end;
  1630. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1631. begin
  1632. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1633. Result := fScheduler.Get(aTaskName);
  1634. end;
  1635. procedure TScheduledTasks.Start;
  1636. begin
  1637. if fIsStarted then Exit;
  1638. if not Assigned(fScheduler) then
  1639. begin
  1640. fScheduler := TScheduler.Create(fTaskList);
  1641. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1642. end;
  1643. fScheduler.Start;
  1644. fIsStarted := True;
  1645. end;
  1646. procedure TScheduledTasks.Stop;
  1647. begin
  1648. if Assigned(fScheduler) then fScheduler.Terminate;
  1649. fIsStarted := False;
  1650. end;
  1651. { TScheduledTask }
  1652. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1653. begin
  1654. Result := Self;
  1655. SetParam(aName,aValue);
  1656. end;
  1657. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1658. begin
  1659. Result := Self;
  1660. SetParam(aName,aValue);
  1661. end;
  1662. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1663. begin
  1664. Result := Self;
  1665. ClearSchedule;
  1666. fScheduleMode := TScheduleMode.smRunOnce;
  1667. fStartDate := aStartDate;
  1668. fNextExecution := aStartDate;
  1669. end;
  1670. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1671. begin
  1672. Result := Self;
  1673. ClearSchedule;
  1674. fScheduleMode := TScheduleMode.smRunOnce;
  1675. fStartDate := IncMinute(Now(),aMinutes);
  1676. fNextExecution := fStartDate;
  1677. end;
  1678. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1679. begin
  1680. Result := Self;
  1681. ClearSchedule;
  1682. fScheduleMode := TScheduleMode.smRunOnce;
  1683. fStartDate := IncSecond(Now(),aSeconds);
  1684. fNextExecution := fStartDate;
  1685. end;
  1686. function TScheduledTask.StartNow: IScheduledTask;
  1687. begin
  1688. Result := Self;
  1689. ClearSchedule;
  1690. fScheduleMode := TScheduleMode.smRunOnce;
  1691. fStartDate := Now();
  1692. fNextExecution := fStartDate;
  1693. end;
  1694. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1695. begin
  1696. Result := Self;
  1697. ClearSchedule;
  1698. fScheduleMode := TScheduleMode.smRunOnce;
  1699. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1700. fNextExecution := fStartDate;
  1701. end;
  1702. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1703. begin
  1704. Result := Self;
  1705. ClearSchedule;
  1706. fScheduleMode := TScheduleMode.smRunOnce;
  1707. fStartDate := ChangeTimeOfADay(Now(),aHour,aMinute,aSecond);
  1708. fNextExecution := fStartDate;
  1709. end;
  1710. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1711. begin
  1712. Result := Self;
  1713. ClearSchedule;
  1714. fScheduleMode := TScheduleMode.smRunOnce;
  1715. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1716. fNextExecution := fStartDate;
  1717. end;
  1718. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1719. begin
  1720. Result := Self;
  1721. SetRetryPolicy(aMaxRetries,0,1);
  1722. end;
  1723. function TScheduledTask.RetryForever: IScheduledTask;
  1724. begin
  1725. Result := Self;
  1726. SetRetryPolicy(-1,0,1);
  1727. end;
  1728. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1729. begin
  1730. Result := Self;
  1731. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1732. end;
  1733. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1734. begin
  1735. Result := Self;
  1736. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1737. end;
  1738. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1739. begin
  1740. Result := Self;
  1741. SetRetryPolicy(aWaitTimeArray);
  1742. end;
  1743. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1744. begin
  1745. Result := Self;
  1746. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1747. end;
  1748. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1749. begin
  1750. Result := Self;
  1751. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1752. end;
  1753. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1754. begin
  1755. if fStartDate = 0.0 then ClearSchedule;
  1756. fScheduleMode := TScheduleMode.smRepeatMode;
  1757. fTimeMeasure := aTimeMeasure;
  1758. fTimeInterval := aInterval;
  1759. if fStartDate < Now() then fStartDate := Now();
  1760. fNextExecution := fStartDate;
  1761. fEnabled := True;
  1762. end;
  1763. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1764. begin
  1765. if fStartDate = 0.0 then ClearSchedule;
  1766. fScheduleMode := TScheduleMode.smRepeatMode;
  1767. fTimeMeasure := aTimeMeasure;
  1768. fTimeInterval := aInterval;
  1769. if fStartDate < Now() then fStartDate := Now();
  1770. fExpirationDate := aEndTime;
  1771. fNextExecution := fStartDate;
  1772. fEnabled := True;
  1773. end;
  1774. procedure TScheduledTask.RepeatEveryDay;
  1775. begin
  1776. RepeatEvery(1,tmDays);
  1777. end;
  1778. procedure TScheduledTask.RepeatEveryWeek;
  1779. begin
  1780. RepeatEvery(7,tmDays);
  1781. end;
  1782. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1783. begin
  1784. if fStartDate = 0.0 then ClearSchedule;
  1785. fScheduleMode := TScheduleMode.smRepeatMode;
  1786. fTimeMeasure := aTimeMeasure;
  1787. fTimeInterval := aInterval;
  1788. if fStartDate < Now() then fStartDate := Now();
  1789. fExpirationTimes := aRepeatTimes;
  1790. fNextExecution := fStartDate;
  1791. fEnabled := True;
  1792. end;
  1793. procedure TScheduledTask.RunOnce;
  1794. begin
  1795. fScheduleMode := TScheduleMode.smRunOnce;
  1796. if fStartDate < Now() then fStartDate := Now();
  1797. fNextExecution := fStartDate;
  1798. fEnabled := True;
  1799. end;
  1800. procedure TScheduledTask.Cancel;
  1801. begin
  1802. fFinished := True;
  1803. end;
  1804. function TScheduledTask.CheckSchedule: Boolean;
  1805. begin
  1806. Result := False;
  1807. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1808. if fScheduleMode = TScheduleMode.smRunOnce then
  1809. begin
  1810. //if start date reached
  1811. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1812. begin
  1813. fLastExecution := Now();
  1814. Inc(fExecutionTimes);
  1815. fFinished := True;
  1816. Result := True;
  1817. end;
  1818. end
  1819. else
  1820. begin
  1821. //if next execution reached
  1822. if Now() >= fNextExecution then
  1823. begin
  1824. //check expiration limits
  1825. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1826. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1827. begin
  1828. fFinished := True;
  1829. Exit;
  1830. end;
  1831. //calculate next execution
  1832. case fTimeMeasure of
  1833. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1834. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1835. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1836. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1837. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1838. end;
  1839. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1840. else
  1841. begin
  1842. fLastExecution := Now();
  1843. Inc(fExecutionTimes);
  1844. Result := True;
  1845. end;
  1846. end;
  1847. end;
  1848. end;
  1849. procedure TScheduledTask.ClearSchedule;
  1850. begin
  1851. inherited;
  1852. fFinished := False;
  1853. fStartDate := 0.0;
  1854. fLastExecution := 0.0;
  1855. fNextExecution := 0.0;
  1856. fExpirationDate := 0.0;
  1857. fScheduleMode := TScheduleMode.smRunOnce;
  1858. fTimeMeasure := TTimeMeasure.tmSeconds;
  1859. fTimeInterval := 0;
  1860. end;
  1861. procedure TScheduledTask.DoExpire;
  1862. begin
  1863. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1864. fEnabled := False;
  1865. end;
  1866. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1867. begin
  1868. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1869. end;
  1870. function TScheduledTask.GetTaskName: string;
  1871. begin
  1872. Result := fName;
  1873. end;
  1874. function TScheduledTask.IsFinished: Boolean;
  1875. begin
  1876. Result := fFinished;
  1877. end;
  1878. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1879. begin
  1880. fExceptProc := aTaskProc;
  1881. Result := Self;
  1882. end;
  1883. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1884. begin
  1885. Result := OnException(aTaskProc);
  1886. TTask(Result).ExceptionWithSync := True;
  1887. end;
  1888. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1889. begin
  1890. fRetryProc := aTaskProc;
  1891. Result := Self;
  1892. end;
  1893. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1894. begin
  1895. fExpiredProc := aTaskProc;
  1896. Result := Self;
  1897. end;
  1898. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1899. begin
  1900. Result := OnExpired(aTaskProc);
  1901. TScheduledTask(Result).ExpireWithSync := True;
  1902. end;
  1903. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1904. begin
  1905. fInitializeProc := aTaskProc;
  1906. Result := Self;
  1907. end;
  1908. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1909. begin
  1910. fTerminateProc := aTaskProc;
  1911. Result := Self;
  1912. end;
  1913. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1914. begin
  1915. Result := OnTerminated(aTaskProc);
  1916. TTask(Result).TerminateWithSync := True;
  1917. end;
  1918. { TScheduler }
  1919. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1920. begin
  1921. inherited Create(True);
  1922. FreeOnTerminate := False;
  1923. fListLock := TCriticalSection.Create;
  1924. fRemoveTaskAfterExpiration := False;
  1925. fTaskList := aTaskList;
  1926. {$IFDEF FPC}
  1927. fCondVar := TSimpleEvent.Create;
  1928. {$ELSE}
  1929. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1930. {$ENDIF}
  1931. end;
  1932. destructor TScheduler.Destroy;
  1933. begin
  1934. fCondVar.SetEvent;
  1935. fCondVar.Free;
  1936. fTaskList := nil;
  1937. fListLock.Free;
  1938. inherited;
  1939. end;
  1940. procedure TScheduler.Execute;
  1941. var
  1942. task : IScheduledTask;
  1943. worker : TScheduledWorker;
  1944. numworker : Int64;
  1945. begin
  1946. numworker := 0;
  1947. while not Terminated do
  1948. begin
  1949. fListLock.Enter;
  1950. try
  1951. for task in fTaskList do
  1952. begin
  1953. if (task.IsEnabled) and (not task.IsFinished) then
  1954. begin
  1955. if task.CheckSchedule then
  1956. begin
  1957. Inc(numworker);
  1958. worker := TScheduledWorker.Create(numworker,task);
  1959. worker.Start;
  1960. end;
  1961. end
  1962. else
  1963. begin
  1964. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1965. end;
  1966. end;
  1967. task := nil;
  1968. finally
  1969. fListLock.Leave;
  1970. end;
  1971. fCondVar.WaitFor(250);
  1972. end;
  1973. end;
  1974. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1975. begin
  1976. Result := fTaskList.Add(aTask);
  1977. end;
  1978. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1979. var
  1980. task : IScheduledTask;
  1981. begin
  1982. fListLock.Enter;
  1983. try
  1984. for task in fTaskList do
  1985. begin
  1986. if task.IdTask = aIdTask then Exit(task);
  1987. end;
  1988. finally
  1989. fListLock.Leave;
  1990. end;
  1991. end;
  1992. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1993. var
  1994. task : IScheduledTask;
  1995. begin
  1996. fListLock.Enter;
  1997. try
  1998. for task in fTaskList do
  1999. begin
  2000. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  2001. end;
  2002. finally
  2003. fListLock.Leave;
  2004. end;
  2005. end;
  2006. { TAdvThread }
  2007. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  2008. begin
  2009. inherited Create(True);
  2010. FreeOnTerminate := True;
  2011. fExecuteWithSync := aSynchronize;
  2012. fExecuteProc := aProc;
  2013. end;
  2014. procedure TAdvThread.DoExecute;
  2015. begin
  2016. try
  2017. if Assigned(fExecuteProc) then fExecuteProc;
  2018. except
  2019. on E : Exception do
  2020. begin
  2021. {$IFNDEF FPC}
  2022. {$IF DELPHIRX10_UP}
  2023. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  2024. {$ELSE}
  2025. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  2026. {$ENDIF}
  2027. {$ELSE}
  2028. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  2029. {$ENDIF}
  2030. else raise e;
  2031. end;
  2032. end;
  2033. end;
  2034. procedure TAdvThread.CallToTerminate;
  2035. begin
  2036. if Assigned(fTerminateProc) then fTerminateProc;
  2037. end;
  2038. procedure TAdvThread.DoTerminate;
  2039. begin
  2040. if fTerminateWithSync then Synchronize(CallToTerminate)
  2041. else CallToTerminate;
  2042. end;
  2043. procedure TAdvThread.Execute;
  2044. begin
  2045. if fExecuteWithSync then Synchronize(DoExecute)
  2046. else DoExecute;
  2047. end;
  2048. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  2049. begin
  2050. fExceptionProc := aProc;
  2051. end;
  2052. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  2053. begin
  2054. fTerminateWithSync := aSynchronize;
  2055. fTerminateProc := aProc;
  2056. end;
  2057. { TRunTask }
  2058. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  2059. begin
  2060. Result := Execute([],False,aTaskProc);
  2061. end;
  2062. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  2063. begin
  2064. Result := Execute_Sync([],False,aTaskProc);
  2065. end;
  2066. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2067. var
  2068. task : TWorkTask;
  2069. worker : TSimpleWorker;
  2070. begin
  2071. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2072. task.ExecuteWithSync := False;
  2073. Result := task;
  2074. worker := TSimpleWorker.Create(task);
  2075. worker.Start;
  2076. end;
  2077. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2078. var
  2079. task : TWorkTask;
  2080. worker : TSimpleWorker;
  2081. begin
  2082. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2083. task.ExecuteWithSync := True;
  2084. Result := task;
  2085. worker := TSimpleWorker.Create(task);
  2086. worker.Start;
  2087. end;
  2088. { TAsyncTask }
  2089. constructor TAsyncTask.Create(aAction : TProc);
  2090. begin
  2091. fProcess := TSimpleThread.Create(aAction,False,True);
  2092. end;
  2093. destructor TAsyncTask.Destroy;
  2094. begin
  2095. inherited;
  2096. end;
  2097. class function TAsyncTask.Run(const aAction : TProc) : IAsyncTask;
  2098. begin
  2099. Result := TAsyncTask.Create(aAction);
  2100. end;
  2101. procedure TAsyncTask.Wait(const aTimeout: Cardinal);
  2102. begin
  2103. if aTimeout = 0 then fProcess.WaitFor
  2104. else fProcess.WaitFor(aTimeout);
  2105. end;
  2106. procedure TAsyncTask.Wait;
  2107. begin
  2108. fProcess.WaitFor;
  2109. end;
  2110. { TAsyncTask<T> }
  2111. constructor TAsyncTask<T>.Create(aAction: TFunc<T>);
  2112. begin
  2113. fWaitForResult := False;
  2114. fProcess := TSimpleThread.Create(procedure
  2115. begin
  2116. fTaskResult := aAction();
  2117. end,False,False);
  2118. end;
  2119. destructor TAsyncTask<T>.Destroy;
  2120. begin
  2121. if not fWaitForResult then fProcess.FreeOnTerminate := True;
  2122. inherited;
  2123. end;
  2124. class function TAsyncTask<T>.Run(const aAction: TFunc<T>): IAsyncTask<T>;
  2125. begin
  2126. Result := TAsyncTask<T>.Create(aAction);
  2127. end;
  2128. function TAsyncTask<T>.Result: T;
  2129. begin
  2130. fWaitForResult := True;
  2131. fProcess.WaitFor;
  2132. Result := fTaskResult;
  2133. fProcess.Free;
  2134. end;
  2135. function TAsyncTask<T>.Result(const aTimeout: Cardinal): T;
  2136. begin
  2137. fWaitForResult := True;
  2138. fProcess.WaitFor(aTimeout);
  2139. Result := fTaskResult;
  2140. fProcess.Free;
  2141. end;
  2142. { TParamValue }
  2143. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2144. begin
  2145. inherited Create;
  2146. fName := aName;
  2147. fValue := aValue;
  2148. fOwned := aOwnedValue;
  2149. end;
  2150. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2151. begin
  2152. inherited Create;
  2153. fName := aName;
  2154. fValue := aValue;
  2155. fOwned := aOwnedValue;
  2156. end;
  2157. constructor TParamValue.Create;
  2158. begin
  2159. fName := '';
  2160. fOwned := False;
  2161. end;
  2162. destructor TParamValue.Destroy;
  2163. begin
  2164. {$IFDEF FPC}
  2165. fValue._Release;
  2166. {$ENDIF}
  2167. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2168. inherited;
  2169. end;
  2170. { TBackgroundWorkers }
  2171. constructor TBackgroundWorkers.Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  2172. begin
  2173. fConcurrentWorkers := aConcurrentWorkers;
  2174. fWorkerExecuteProc := aWorkerProc;
  2175. fWorkerPool := TWorkerPool.Create(True);
  2176. end;
  2177. destructor TBackgroundWorkers.Destroy;
  2178. begin
  2179. fWorkerPool.Free;
  2180. inherited;
  2181. end;
  2182. procedure TBackgroundWorkers.Start;
  2183. var
  2184. i : Integer;
  2185. worker : TWorker;
  2186. task : IWorkTask;
  2187. begin
  2188. for i := 1 to fConcurrentWorkers do
  2189. begin
  2190. task := TWorkTask.Create([],False,fWorkerExecuteProc)
  2191. .OnInitialize(fWorkerInitProc)
  2192. .OnRetry(fWorkerRetryProc)
  2193. .OnException(fWorkerExceptionProc)
  2194. .OnTerminated(fWorkerTerminateProc);
  2195. task.NumWorker := i;
  2196. task.Run;
  2197. worker := TSimpleWorker.Create(task,False);
  2198. fWorkerPool.Add(worker);
  2199. worker.Start;
  2200. end;
  2201. end;
  2202. procedure TBackgroundWorkers.Stop;
  2203. var
  2204. worker : TWorker;
  2205. begin
  2206. for worker in fWorkerPool do
  2207. begin
  2208. worker.Terminate;
  2209. worker.WaitFor;
  2210. fWorkerPool.Remove(worker);
  2211. end;
  2212. end;
  2213. function TBackgroundWorkers.OnException(aTaskProc: TTaskExceptionProc): TBackgroundWorkers;
  2214. begin
  2215. Result := Self;
  2216. fWorkerExceptionProc := aTaskProc;
  2217. end;
  2218. function TBackgroundWorkers.OnInitialize(aTaskProc: TTaskProc): TBackgroundWorkers;
  2219. begin
  2220. Result := Self;
  2221. fWorkerInitProc := aTaskProc;
  2222. end;
  2223. function TBackgroundWorkers.OnRetry(aTaskProc: TTaskRetryProc): TBackgroundWorkers;
  2224. begin
  2225. Result := Self;
  2226. fWorkerRetryProc := aTaskProc;
  2227. end;
  2228. function TBackgroundWorkers.OnTerminated(aTaskProc: TTaskProc): TBackgroundWorkers;
  2229. begin
  2230. Result := Self;
  2231. fWorkerTerminateProc := aTaskProc;
  2232. end;
  2233. function TBackgroundWorkers.Retry(aMaxRetries: Integer): TBackgroundWorkers;
  2234. begin
  2235. Result := Self;
  2236. SetRetryPolicy(aMaxRetries,0,1);
  2237. end;
  2238. function TBackgroundWorkers.RetryForever: TBackgroundWorkers;
  2239. begin
  2240. Result := Self;
  2241. SetRetryPolicy(-1,0,1);
  2242. end;
  2243. procedure TBackgroundWorkers.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double);
  2244. begin
  2245. fFaultPolicy.MaxRetries := aMaxRetries;
  2246. fFaultPolicy.WaitTimeBetweenRetries := aWaitTimeBetweenRetriesMS;
  2247. fFaultPolicy.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  2248. end;
  2249. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2250. begin
  2251. Result := Self;
  2252. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  2253. end;
  2254. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2255. begin
  2256. Result := Self;
  2257. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  2258. end;
  2259. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2260. begin
  2261. Result := Self;
  2262. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  2263. end;
  2264. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2265. begin
  2266. Result := Self;
  2267. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  2268. end;
  2269. { TSimpleThread }
  2270. constructor TSimpleThread.Create(aProc: TProc; aCreateSuspended, aFreeOnTerminate : Boolean);
  2271. begin
  2272. if not Assigned(aProc) then raise EArgumentNilException.Create('param cannot be nil!');
  2273. fTimeoutFlag := TLightweightEvent.Create;
  2274. fTimeoutFlag.ResetEvent;
  2275. fExecuteProc := aProc;
  2276. inherited Create(aCreateSuspended);
  2277. Self.FreeOnTerminate := aFreeOnTerminate;
  2278. end;
  2279. destructor TSimpleThread.Destroy;
  2280. begin
  2281. if Assigned(fTimeoutFlag) then
  2282. begin
  2283. fTimeoutFlag.Release;
  2284. fTimeoutFlag.Free;
  2285. end;
  2286. inherited;
  2287. end;
  2288. procedure TSimpleThread.Execute;
  2289. begin
  2290. fExecuteProc;
  2291. fTimeoutFlag.SetEvent;
  2292. end;
  2293. function TSimpleThread.WaitFor(const aTimeout: Cardinal) : TWaitResult;
  2294. begin
  2295. Result := fTimeoutFlag.WaitFor(aTimeout);
  2296. if Result = TWaitResult.wrTimeout then raise Exception.Create('Timeout');
  2297. end;
  2298. end.