Quick.Threads.pas 74 KB


  1. { ***************************************************************************
  2. Copyright (c) 2016-2022 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.5
  7. Created : 09/03/2018
  8. Modified : 14/06/2022
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. SysUtils,
  28. DateUtils,
  29. Quick.Commons,
  30. //Quick.Chrono,
  31. Quick.Value,
  32. Quick.FaultControl,
  33. {$IFNDEF FPC}
  34. System.RTLConsts,
  35. System.Generics.Collections,
  36. System.SyncObjs;
  37. {$ELSE}
  38. RtlConsts,
  39. Generics.Collections,
  40. syncobjs;
  41. {$ENDIF}
  42. type
  43. TThreadedQueueCS<T> = class
  44. private
  45. FQueue: array of T;
  46. FQueueSize, FQueueOffset: Integer;
  47. FQueueLock: TCriticalSection;
  48. {$IFDEF FPC}
  49. FQueueCondVar : TEventObject;
  50. {$ELSE}
  51. FQueueCondVar: TConditionVariableCS;
  52. {$ENDIF}
  53. FShutDown: Boolean;
  54. FPushTimeout, FPopTimeout: Cardinal;
  55. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  56. public
  57. constructor Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  58. destructor Destroy; override;
  59. procedure Grow(ADelta: Integer);
  60. function PushItem(const AItem: T): TWaitResult; overload;
  61. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  62. function PopItem: T; overload;
  63. function PopItem(var AQueueSize: Integer): T; overload;
  64. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  65. function PopItem(var AItem: T): TWaitResult; overload;
  66. procedure DoShutDown;
  67. procedure Clear;
  68. property QueueSize: Integer read FQueueSize;
  69. property ShutDown: Boolean read FShutDown;
  70. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  71. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  72. end;
  73. TThreadedQueueList<T> = class
  74. private
  75. fQueue : TQueue<T>;
  76. fQueueSize : Integer;
  77. fQueueLock : TCriticalSection;
  78. {$IFDEF FPC}
  79. FQueueCondVar : TSimpleEvent;
  80. {$ELSE}
  81. FQueueCondVar: TConditionVariableCS;
  82. {$ENDIF}
  83. fShutDown : Boolean;
  84. fPushTimeout : Cardinal;
  85. fPopTimeout : Cardinal;
  86. fTotalItemsPushed : Cardinal;
  87. fTotalItemsPopped : Cardinal;
  88. public
  89. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  90. destructor Destroy; override;
  91. procedure Grow(ADelta: Integer);
  92. function PushItem(const AItem: T): TWaitResult; overload;
  93. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  94. function PopItem: T; overload;
  95. function PopItem(var AQueueSize: Integer): T; overload;
  96. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  97. function PopItem(var AItem: T): TWaitResult; overload;
  98. procedure DoShutDown;
  99. property QueueSize: Integer read FQueueSize;
  100. property ShutDown: Boolean read FShutDown;
  101. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  102. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  103. end;
  104. {$IFNDEF FPC}
  105. TThreadObjectList<T: class> = class(TList<T>)
  106. private
  107. fList: TObjectList<T>;
  108. fLock: TObject;
  109. fDuplicates: TDuplicates;
  110. function GetItem(aIndex : Integer) : T;
  111. procedure SetItem(aIndex : Integer; aValue : T);
  112. public
  113. constructor Create(OwnedObjects : Boolean);
  114. destructor Destroy; override;
  115. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  116. procedure Add(const Item: T);
  117. procedure Clear;
  118. function LockList: TObjectList<T>;
  119. procedure Remove(const Item: T); inline;
  120. procedure RemoveItem(const Item: T; Direction: TDirection);
  121. procedure UnlockList; inline;
  122. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  123. end;
  124. {$ENDIF}
  125. {$IFNDEF FPC}
  126. TAnonExceptionProc = reference to procedure(aException : Exception);
  127. TAnonProc = TProc;
  128. {$ELSE}
  129. TProc = procedure of object;
  130. TAnonExceptionProc = procedure(aException : Exception) of object;
  131. {$ENDIF}
  132. TThreadWorkStatus = (wsRunning, wsDone, wsException);
  133. TSimpleThread = class(TThread)
  134. private
  135. fExecuteProc : TProc;
  136. {$IFNDEF FPC}
  137. fTimeoutFlag : TLightweightEvent;
  138. {$ELSE}
  139. fTimeoutFlag : TSimpleEvent;
  140. {$ENDIF}
  141. public
  142. constructor Create(aProc: TProc; aCreateSuspended, aFreeOnTerminate : Boolean);
  143. destructor Destroy; override;
  144. procedure Execute; override;
  145. function WaitFor(const aTimeout : Cardinal) : TWaitResult; overload;
  146. end;
  147. TAdvThread = class(TThread)
  148. private
  149. fExecuteProc : TProc;
  150. fExceptionProc : TAnonExceptionProc;
  151. fTerminateProc : TProc;
  152. fExecuteWithSync : Boolean;
  153. fTerminateWithSync : Boolean;
  154. procedure DoExecute;
  155. procedure CallToTerminate;
  156. protected
  157. procedure DoTerminate; override;
  158. public
  159. constructor Create(aProc : TProc; aSynchronize : Boolean);
  160. procedure OnException(aProc : TAnonExceptionProc);
  161. procedure OnTerminate(aProc : TProc; aSynchronize : Boolean);
  162. procedure Execute; override;
  163. end;
  164. IAnonymousThread = interface
  165. procedure Start;
  166. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  167. function OnTerminate(aProc : TProc) : IAnonymousThread;
  168. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread;
  169. end;
  170. TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
  171. private
  172. fThread : TAdvThread;
  173. constructor Create(aProc : TProc; aSynchronize : Boolean);
  174. public
  175. class function Execute(aProc : TProc) : IAnonymousThread; overload;
  176. class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload;
  177. procedure Start;
  178. function OnException(aProc : TAnonExceptionProc) : IAnonymousThread;
  179. function OnTerminate(aProc : TProc) : IAnonymousThread; overload;
  180. function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload;
  181. end;
  182. TParamValue = class
  183. private
  184. fName : string;
  185. fValue : TFlexValue;
  186. fOwned : Boolean;
  187. public
  188. constructor Create; overload;
  189. constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload;
  190. constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload;
  191. destructor Destroy; override;
  192. property Name : string read fName write fName;
  193. property Value : TFlexValue read fValue write fValue;
  194. property Owned : Boolean read fOwned write fOwned;
  195. end;
  196. TParamList = TObjectList<TParamValue>;
  197. TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
  198. TScheduleMode = (smRunOnce, smRepeatMode);
  199. TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds);
  200. ETaskAddError = class(Exception);
  201. ETaskInitializationError = class(Exception);
  202. ETaskExecutionError = class(Exception);
  203. ETaskParamError = class(Exception);
  204. ETaskSchedulerError = class(Exception);
  205. ITask = interface
  206. ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
  207. function GetParam(aIndex : Integer) : TFlexValue; overload;
  208. function GetParam(const aName : string) : TFlexValue; overload;
  209. function GetParam2(aIndex : Integer) : PFlexValue;
  210. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  211. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  212. function TaskStatus : TWorkTaskStatus;
  213. function GetNumWorker : Integer;
  214. procedure SetNumWorker(Value : Integer);
  215. function GetIdTask : Int64;
  216. procedure SetIdTask(Value : Int64);
  217. function GetResult : TFlexValue;
  218. procedure SetResult(aValue : TFlexValue);
  219. procedure DoExecute;
  220. procedure DoException(aException : Exception);
  221. procedure DoTerminate;
  222. procedure Enable;
  223. procedure Disable;
  224. {$IFNDEF FPC}
  225. property Param[index : Integer] : TFlexValue read GetParam write SetParam; default;
  226. property Param[const Name : string] : TFlexValue read GetParam write SetParam; default;
  227. {$ELSE}
  228. property Param[index : Integer] : TFlexValue read GetParam write SetParam;
  229. property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default;
  230. {$ENDIF}
  231. property NumWorker : Integer read GetNumWorker write SetNumWorker;
  232. property Result : TFlexValue read GetResult write SetResult;
  233. property IdTask : Int64 read GetIdTask;
  234. function Done : Boolean;
  235. function Failed : Boolean;
  236. function NumRetries : Integer;
  237. function MaxRetries : Integer;
  238. function LastException : Exception;
  239. function CircuitBreaked : Boolean;
  240. function IsEnabled : Boolean;
  241. end;
  242. {$IFNDEF FPC}
  243. TTaskProc = reference to procedure(task : ITask);
  244. TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
  245. TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean);
  246. {$ELSE}
  247. TTaskProc = procedure(task : ITask) of object;
  248. TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
  249. TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object;
  250. {$ENDIF}
  251. IWorkTask = interface(ITask)
  252. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  253. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  254. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask;
  255. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask;
  256. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
  257. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask;
  258. function Retry(aMaxRetries : Integer) : IWorkTask;
  259. function RetryForever : IWorkTask;
  260. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  261. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  262. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  263. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  264. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  265. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  266. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  267. procedure Run;
  268. end;
  269. IScheduledTask = interface(ITask)
  270. ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
  271. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  272. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  273. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask;
  274. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask;
  275. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
  276. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  277. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
  278. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask;
  279. function Retry(aMaxRetries : Integer) : IScheduledTask;
  280. function RetryForever : IScheduledTask;
  281. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  282. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  283. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  284. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  285. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  286. function CheckSchedule : Boolean;
  287. procedure DoExpire;
  288. function GetTaskName : string;
  289. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  290. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  291. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  292. function StartOnDayChange : IScheduledTask;
  293. function StartNow : IScheduledTask;
  294. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  295. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  296. procedure RunOnce;
  297. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  298. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  299. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  300. procedure RepeatEveryDay;
  301. procedure RepeatEveryWeek;
  302. function IsFinished : Boolean;
  303. procedure Cancel;
  304. property Name : string read GetTaskName;
  305. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  306. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  307. end;
  308. TTask = class(TInterfacedObject,ITask)
  309. private
  310. fIdTask : Int64;
  311. fNumWorker : Integer;
  312. fNumRetries : Integer;
  313. fParamList : TParamList;
  314. fInitializeProc : TTaskProc;
  315. fExecuteProc : TTaskProc;
  316. fExceptProc : TTaskExceptionProc;
  317. fTerminateProc : TTaskProc;
  318. fExpiredProc : TTaskProc;
  319. fTaskStatus : TWorkTaskStatus;
  320. fOwnedParams : Boolean;
  321. fEnabled : Boolean;
  322. fExecuteWithSync : Boolean;
  323. fExceptionWithSync : Boolean;
  324. fRetryProc : TTaskRetryProc;
  325. fTerminateWithSync : Boolean;
  326. fFaultControl : TFaultControl;
  327. fCustomFaultPolicy : Boolean;
  328. fResult : TFlexValue;
  329. function GetParam(aIndex : Integer) : TFlexValue; overload;
  330. function GetParam(const aName : string) : TFlexValue; overload;
  331. function GetParam2(aIndex : Integer) : PFlexValue;
  332. procedure SetParam(aIndex : Integer; Value : TFlexValue); overload;
  333. procedure SetParam(const aName : string; Value : TFlexValue); overload;
  334. procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload;
  335. procedure DoInitialize;
  336. procedure DoExecute;
  337. procedure DoException(aException : Exception);
  338. procedure DoTerminate;
  339. function GetNumWorker : Integer;
  340. procedure SetNumWorker(Value : Integer);
  341. function GetIdTask : Int64;
  342. procedure SetIdTask(Value : Int64);
  343. function GetResult : TFlexValue;
  344. procedure SetResult(aValue : TFlexValue);
  345. protected
  346. property FaultControl : TFaultControl read fFaultControl write fFaultControl;
  347. property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy;
  348. property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync;
  349. property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync;
  350. property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync;
  351. procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean);
  352. procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy);
  353. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload;
  354. procedure SetRetryPolicy(aWaitTimeMSArray : TArray<Integer>); overload;
  355. public
  356. constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
  357. destructor Destroy; override;
  358. property IdTask : Int64 read GetIdTask;
  359. property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
  360. function IsEnabled : Boolean;
  361. function TaskStatus : TWorkTaskStatus;
  362. function Done : Boolean;
  363. function Failed : Boolean;
  364. function NumRetries : Integer;
  365. function MaxRetries : Integer;
  366. function LastException : Exception;
  367. function CircuitBreaked : Boolean;
  368. procedure Disable;
  369. procedure Enable;
  370. end;
  371. TWorkTask = class(TTask,IWorkTask)
  372. public
  373. function OnInitialize(aTaskProc : TTaskProc) : IWorkTask;
  374. function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  375. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
  376. function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
  377. function OnTerminated_Sync(aTaskProc : TTaskProc) : IWorkTask; virtual;
  378. function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual;
  379. function Retry(aMaxRetries : Integer) : IWorkTask;
  380. function RetryForever : IWorkTask;
  381. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  382. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IWorkTask; overload;
  383. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  384. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload;
  385. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload;
  386. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload;
  387. function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload;
  388. procedure Run; virtual;
  389. end;
  390. TTaskQueue = TThreadedQueueCS<IWorkTask>;
  391. TScheduledTask = class(TTask,IScheduledTask)
  392. private
  393. fName : string;
  394. fExecutionTimes : Integer;
  395. fScheduleMode : TScheduleMode;
  396. fTimeInterval : Integer;
  397. fTimeMeasure : TTimeMeasure;
  398. fStartDate : TDateTime;
  399. fLastExecution : TDateTime;
  400. fNextExecution : TDateTime;
  401. fExpirationDate : TDateTime;
  402. fExpirationTimes : Integer;
  403. fFinished : Boolean;
  404. fExpireWithSync: Boolean;
  405. procedure ClearSchedule;
  406. function CheckSchedule : Boolean;
  407. procedure DoExpire;
  408. function GetTaskName : string;
  409. function GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  410. protected
  411. property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync;
  412. public
  413. property Name : string read fName write fName;
  414. function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask;
  415. property CurrentSchedule : TPair<TTimeMeasure, Integer> read GetCurrentSchedule;
  416. function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  417. function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
  418. function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual;
  419. function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  420. function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  421. function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  422. function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual;
  423. function StartAt(aStartDate : TDateTime) : IScheduledTask;
  424. function StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  425. function StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  426. function StartOnDayChange : IScheduledTask;
  427. function StartNow : IScheduledTask;
  428. function StartInMinutes(aMinutes : Word) : IScheduledTask;
  429. function StartInSeconds(aSeconds : Word) : IScheduledTask;
  430. procedure RunOnce;
  431. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
  432. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
  433. procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
  434. procedure RepeatEveryDay;
  435. procedure RepeatEveryWeek;
  436. function Retry(aMaxRetries : Integer) : IScheduledTask;
  437. function RetryForever : IScheduledTask;
  438. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  439. function WaitAndRetry(aWaitTimeArray : TArray<Integer>) : IScheduledTask; overload;
  440. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  441. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload;
  442. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload;
  443. function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload;
  444. function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload;
  445. function IsFinished : Boolean;
  446. procedure Cancel;
  447. end;
  448. TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
  449. TWorker = class(TThread)
  450. protected
  451. fStatus : TWorkerStatus;
  452. fCurrentTask : ITask;
  453. fDefaultFaultPolicy : TFaultPolicy;
  454. procedure ExecuteTask;
  455. procedure TerminateTask;
  456. public
  457. constructor Create;
  458. destructor Destroy; override;
  459. property Status : TWorkerStatus read fStatus;
  460. procedure SetFaultPolicy(aTask : TTask);
  461. procedure Execute; override;
  462. end;
  463. TSimpleWorker = class(TWorker)
  464. private
  465. fRunOnce : Boolean;
  466. public
  467. constructor Create(aTask : ITask; aRunOnce : Boolean = True);
  468. procedure Execute; override;
  469. end;
  470. TQueueWorker = class(TWorker)
  471. private
  472. fCurrentIdTask : Integer;
  473. fNumWorker : Integer;
  474. fTaskQueue : TTaskQueue;
  475. public
  476. constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
  477. property NumWorker : Integer read fNumWorker;
  478. procedure Execute; override;
  479. end;
  480. TScheduledWorker = class(TWorker)
  481. private
  482. procedure ExpireTask;
  483. public
  484. constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  485. procedure Execute; override;
  486. end;
  487. TWorkerPool = TObjectList<TWorker>;
  488. TRunTask = class
  489. public
  490. class function Execute(aTaskProc: TTaskProc): IWorkTask; overload;
  491. class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  492. class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload;
  493. class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload;
  494. end;
  495. IAsyncTask = interface
  496. ['{90A27D06-6FCD-493C-8AA0-C52C5105ED8B}']
  497. procedure Wait; overload;
  498. procedure Wait(const aTimeout : Cardinal); overload;
  499. end;
  500. TAsyncTask = class(TInterfacedObject,IAsyncTask)
  501. private
  502. fProcess : TSimpleThread;
  503. constructor Create(aAction : TProc);
  504. public
  505. class function Run(const aAction : TProc) : IAsyncTask; virtual;
  506. procedure Wait; overload;
  507. procedure Wait(const aTimeout : Cardinal); overload;
  508. destructor Destroy; override;
  509. end;
  510. {$IFNDEF FPC}
  511. IAsyncTask<T> = interface
  512. ['{8529BBD4-B5AD-4674-8E42-3C74F5156A97}']
  513. function Result : T; overload;
  514. function Result(const aTimeout : Cardinal) : T; overload;
  515. end;
  516. TAsyncTask<T> = class(TInterfacedObject,IAsyncTask<T>)
  517. private
  518. fProcess : TSimpleThread;
  519. fTaskResult : T;
  520. fWaitForResult : Boolean;
  521. function Result : T; overload;
  522. function Result(const aTimeout : Cardinal) : T; overload;
  523. constructor Create(aAction : TFunc<T>);
  524. public
  525. class function Run(const aAction : TFunc<T>) : IAsyncTask<T>; virtual;
  526. destructor Destroy; override;
  527. end;
  528. {$ENDIF}
  529. TBackgroundTasks = class
  530. private
  531. fMaxQueue : Integer;
  532. fWorkerPool : TWorkerPool;
  533. fConcurrentWorkers : Integer;
  534. fInsertTimeout : Cardinal;
  535. fExtractTimeout : Cardinal;
  536. fTaskQueue : TTaskQueue;
  537. fNumPushedTasks : Int64;
  538. function GetTaskQueue : Cardinal;
  539. public
  540. constructor Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  541. destructor Destroy; override;
  542. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  543. property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
  544. property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
  545. property TaskQueued : Cardinal read GetTaskQueue;
  546. property NumPushedTasks : Int64 read fNumPushedTasks;
  547. property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
  548. function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
  549. function AddTask_Sync(aTaskProc : TTaskProc) : IWorkTask; overload;
  550. function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  551. function AddTask_Sync(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
  552. procedure Start;
  553. procedure CancelAll;
  554. end;
  555. TScheduledTaskList = TList<IScheduledTask>;
  556. TScheduler = class(TThread)
  557. private
  558. fListLock : TCriticalSection;
  559. fCondVar : TSimpleEvent;
  560. fTaskList : TScheduledTaskList;
  561. fRemoveTaskAfterExpiration : Boolean;
  562. public
  563. constructor Create(aTaskList : TScheduledTaskList);
  564. destructor Destroy; override;
  565. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  566. procedure Execute; override;
  567. function Add(aTask : TScheduledTask) : Integer;
  568. function Get(aIdTask : Int64) : IScheduledTask; overload;
  569. function Get(const aTaskName : string) : IScheduledTask; overload;
  570. end;
  571. TScheduledTasks = class
  572. private
  573. fTaskList : TScheduledTaskList;
  574. fScheduler : TScheduler;
  575. fNumPushedTasks : Int64;
  576. fRemoveTaskAfterExpiration : Boolean;
  577. fIsStarted : Boolean;
  578. fFaultPolicy : TFaultPolicy;
  579. public
  580. constructor Create;
  581. destructor Destroy; override;
  582. property NumPushedTasks : Int64 read fNumPushedTasks;
  583. property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
  584. property IsStarted : Boolean read fIsStarted;
  585. property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy;
  586. function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  587. function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
  588. function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  589. function AddTask_Sync(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
  590. function GetTask(aIdTask : Int64) : IScheduledTask; overload;
  591. function GetTask(const aTaskName : string) : IScheduledTask; overload;
  592. procedure Start;
  593. procedure Stop;
  594. end;
  595. TBackgroundWorkers = class
  596. private
  597. fWorkerPool : TWorkerPool;
  598. fConcurrentWorkers : Integer;
  599. fWorkerInitProc : TTaskProc;
  600. fWorkerExecuteProc : TTaskProc;
  601. fWorkerRetryProc : TTaskRetryProc;
  602. fWorkerExceptionProc : TTaskExceptionProc;
  603. fWorkerTerminateProc : TTaskProc;
  604. fFaultPolicy : TFaultPolicy;
  605. procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  606. public
  607. constructor Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  608. destructor Destroy; override;
  609. property ConcurrentWorkers : Integer read fConcurrentWorkers;
  610. function OnInitialize(aTaskProc : TTaskProc) : TBackgroundWorkers;
  611. function OnException(aTaskProc : TTaskExceptionProc) : TBackgroundWorkers;
  612. function OnRetry(aTaskProc : TTaskRetryProc) : TBackgroundWorkers;
  613. function OnTerminated(aTaskProc : TTaskProc) : TBackgroundWorkers;
  614. function Retry(aMaxRetries : Integer) : TBackgroundWorkers;
  615. function RetryForever : TBackgroundWorkers;
  616. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  617. function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  618. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : TBackgroundWorkers; overload;
  619. function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : TBackgroundWorkers; overload;
  620. procedure Start;
  621. procedure Stop;
  622. end;
  623. implementation
  624. { TThreadedQueueCS<T> }
  625. procedure TThreadedQueueCS<T>.Clear;
  626. var
  627. obj : T;
  628. begin
  629. FQueueLock.Enter;
  630. try
  631. for obj in FQueue do
  632. begin
  633. if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
  634. end;
  635. SetLength(FQueue,0);
  636. finally
  637. FQueueLock.Leave;
  638. end;
  639. {$IFDEF FPC}
  640. FQueueCondVar.SetEvent;
  641. {$ELSE}
  642. FQueueCondVar.ReleaseAll;
  643. {$ENDIF}
  644. end;
  645. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 16; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  646. begin
  647. inherited Create;
  648. if AQueueDepth < 10 then raise Exception.Create('QueueDepth will be 10 or greater value');
  649. SetLength(FQueue, AQueueDepth);
  650. FQueueLock := TCriticalSection.Create;
  651. {$IFDEF FPC}
  652. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  653. {$ELSE}
  654. FQueueCondVar := TConditionVariableCS.Create;
  655. {$ENDIF}
  656. FPushTimeout := PushTimeout;
  657. FPopTimeout := PopTimeout;
  658. end;
  659. destructor TThreadedQueueCS<T>.Destroy;
  660. begin
  661. DoShutDown;
  662. FQueueLock.Free;
  663. FQueueCondVar.Free;
  664. inherited;
  665. end;
  666. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  667. begin
  668. FQueueLock.Enter;
  669. try
  670. SetLength(FQueue, Length(FQueue) + ADelta);
  671. finally
  672. FQueueLock.Leave;
  673. end;
  674. {$IFDEF FPC}
  675. FQueueCondVar.SetEvent;
  676. {$ELSE}
  677. FQueueCondVar.ReleaseAll;
  678. {$ENDIF}
  679. end;
  680. function TThreadedQueueCS<T>.PopItem: T;
  681. var
  682. LQueueSize: Integer;
  683. begin
  684. PopItem(LQueueSize, Result);
  685. end;
  686. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  687. begin
  688. AItem := Default(T);
  689. FQueueLock.Enter;
  690. try
  691. Result := wrSignaled;
  692. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  693. begin
  694. {$IFDEF FPC}
  695. Result := FQueueCondVar.WaitFor(FPopTimeout);
  696. {$ELSE}
  697. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  698. {$ENDIF}
  699. end;
  700. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  701. AItem := FQueue[FQueueOffset];
  702. FQueue[FQueueOffset] := Default(T);
  703. if FQueueSize = Length(FQueue) then
  704. begin
  705. {$IFDEF FPC}
  706. FQueueCondVar.SetEvent;
  707. {$ELSE}
  708. FQueueCondVar.ReleaseAll;
  709. {$ENDIF}
  710. end;
  711. Dec(FQueueSize);
  712. Inc(FQueueOffset);
  713. Inc(FTotalItemsPopped);
  714. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  715. finally
  716. AQueueSize := FQueueSize;
  717. FQueueLock.Leave;
  718. end;
  719. end;
  720. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  721. var
  722. LQueueSize: Integer;
  723. begin
  724. Result := PopItem(LQueueSize, AItem);
  725. end;
  726. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  727. begin
  728. PopItem(AQueueSize, Result);
  729. end;
  730. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  731. var
  732. LQueueSize: Integer;
  733. begin
  734. Result := PushItem(AItem, LQueueSize);
  735. end;
  736. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  737. begin
  738. FQueueLock.Enter;
  739. try
  740. if FQueueSize >= High(FQueue) then
  741. begin
  742. if FQueueSize < 1024 then Grow(FQueueSize)
  743. else Grow(FQueueSize Div 2);
  744. end;
  745. Result := wrSignaled;
  746. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  747. begin
  748. {$IFDEF FPC}
  749. Result := FQueueCondVar.WaitFor(FPushTimeout);
  750. {$ELSE}
  751. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  752. {$ENDIF}
  753. end;
  754. if FShutDown or (Result <> wrSignaled) then Exit;
  755. if FQueueSize = 0 then
  756. begin
  757. {$IFDEF FPC}
  758. FQueueCondVar.SetEvent;
  759. {$ELSE}
  760. FQueueCondVar.ReleaseAll;
  761. {$ENDIF}
  762. end;
  763. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  764. Inc(FQueueSize);
  765. Inc(FTotalItemsPushed);
  766. finally
  767. AQueueSize := FQueueSize;
  768. FQueueLock.Leave;
  769. end;
  770. end;
  771. procedure TThreadedQueueCS<T>.DoShutDown;
  772. begin
  773. FShutDown := True;
  774. {$IFDEF FPC}
  775. FQueueCondVar.SetEvent;
  776. {$ELSE}
  777. FQueueCondVar.ReleaseAll;
  778. {$ENDIF}
  779. end;
  780. { TThreadedQueueList<T> }
  781. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  782. begin
  783. inherited Create;
  784. fQueue := TQueue<T>.Create;
  785. fQueue.Capacity := AQueueDepth;
  786. fQueueSize := 0;
  787. fQueueLock := TCriticalSection.Create;
  788. {$IFDEF FPC}
  789. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  790. {$ELSE}
  791. fQueueCondVar := TConditionVariableCS.Create;
  792. {$ENDIF}
  793. fPushTimeout := PushTimeout;
  794. fPopTimeout := PopTimeout;
  795. end;
  796. destructor TThreadedQueueList<T>.Destroy;
  797. begin
  798. DoShutDown;
  799. fQueueCondVar.Free;
  800. fQueueLock.Free;
  801. //fQueueCondVar.Free;
  802. fQueue.Free;
  803. inherited;
  804. end;
  805. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  806. begin
  807. fQueueLock.Enter;
  808. try
  809. fQueue.Capacity := fQueue.Capacity + ADelta;
  810. finally
  811. fQueueLock.Leave;
  812. end;
  813. {$IFDEF FPC}
  814. FQueueCondVar.SetEvent;
  815. {$ELSE}
  816. FQueueCondVar.ReleaseAll;
  817. {$ENDIF}
  818. end;
  819. function TThreadedQueueList<T>.PopItem: T;
  820. var
  821. LQueueSize: Integer;
  822. begin
  823. PopItem(LQueueSize, Result);
  824. end;
  825. {$IFDEF FPC}
  826. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  827. //var
  828. //crono : TChronometer;
  829. begin
  830. AItem := Default(T);
  831. //crono := TChronometer.Create(False);
  832. try
  833. Result := wrSignaled;
  834. //writeln('popitem');
  835. //crono.Start;
  836. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  837. begin
  838. //crono.Start;
  839. Result := FQueueCondVar.WaitFor(FPopTimeout);
  840. //crono.Stop;
  841. //writeln('in: ' + crono.ElapsedTime);
  842. //if result = twaitresult.wrError then result := twaitresult.wrError;
  843. end;
  844. //crono.Stop;
  845. //writeln('out: ' + crono.ElapsedTime);
  846. fQueueLock.Enter;
  847. try
  848. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  849. AItem := fQueue.Extract;
  850. Dec(FQueueSize);
  851. Inc(fTotalItemsPopped);
  852. finally
  853. fQueueLock.Leave;
  854. end;
  855. finally
  856. AQueueSize := fQueueSize;
  857. end;
  858. end;
  859. {$ELSE}
  860. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  861. begin
  862. AItem := Default(T);
  863. fQueueLock.Enter;
  864. try
  865. Result := wrSignaled;
  866. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  867. begin
  868. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  869. end;
  870. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  871. AItem := fQueue.Extract;
  872. if fQueueSize = fQueue.Count then
  873. begin
  874. FQueueCondVar.ReleaseAll;
  875. end;
  876. Dec(FQueueSize);
  877. Inc(fTotalItemsPopped);
  878. finally
  879. AQueueSize := fQueueSize;
  880. fQueueLock.Leave;
  881. end;
  882. end;
  883. {$ENDIF}
  884. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  885. var
  886. LQueueSize: Integer;
  887. begin
  888. Result := PopItem(LQueueSize, AItem);
  889. end;
  890. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  891. begin
  892. PopItem(AQueueSize, Result);
  893. end;
  894. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  895. var
  896. LQueueSize: Integer;
  897. begin
  898. Result := PushItem(AItem, LQueueSize);
  899. end;
  900. {$IFDEF FPC}
  901. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  902. begin
  903. FQueueLock.Enter;
  904. try
  905. if FQueueSize >= fQueue.Count then Grow(10);
  906. Result := wrSignaled;
  907. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  908. //begin
  909. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  910. //end;
  911. if fShutDown or (Result <> wrSignaled) then Exit;
  912. //if fQueueSize = 0 then
  913. //begin
  914. // FQueueCondVar.SetEvent;
  915. //end;
  916. fQueue.Enqueue(AItem);
  917. Inc(FQueueSize);
  918. Inc(fTotalItemsPushed);
  919. finally
  920. AQueueSize := fQueueSize;
  921. FQueueLock.Leave;
  922. //FQueueCondVar.SetEvent;
  923. end;
  924. end;
  925. {$ELSE}
  926. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  927. begin
  928. FQueueLock.Enter;
  929. try
  930. Result := wrSignaled;
  931. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  932. //begin
  933. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  934. //end;
  935. if fShutDown or (Result <> wrSignaled) then Exit;
  936. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  937. fQueue.Enqueue(AItem);
  938. Inc(FQueueSize);
  939. Inc(fTotalItemsPushed);
  940. finally
  941. AQueueSize := fQueueSize;
  942. FQueueLock.Leave;
  943. end;
  944. end;
  945. {$ENDIF}
  946. procedure TThreadedQueueList<T>.DoShutDown;
  947. begin
  948. fShutDown := True;
  949. {$IFDEF FPC}
  950. FQueueCondVar.SetEvent;
  951. {$ELSE}
  952. FQueueCondVar.ReleaseAll;
  953. {$ENDIF}
  954. end;
  955. {$IFNDEF FPC}
  956. { TThreadObjectList<T> }
  957. procedure TThreadObjectList<T>.Add(const Item: T);
  958. begin
  959. LockList;
  960. try
  961. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  962. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  963. finally
  964. UnlockList;
  965. end;
  966. end;
  967. procedure TThreadObjectList<T>.Clear;
  968. begin
  969. LockList;
  970. try
  971. fList.Clear;
  972. finally
  973. UnlockList;
  974. end;
  975. end;
  976. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  977. begin
  978. inherited Create;
  979. fLock := TObject.Create;
  980. fList := TObjectList<T>.Create;
  981. fDuplicates := dupIgnore;
  982. end;
  983. destructor TThreadObjectList<T>.Destroy;
  984. begin
  985. LockList;
  986. try
  987. fList.Free;
  988. inherited Destroy;
  989. finally
  990. UnlockList;
  991. fLock.Free;
  992. end;
  993. end;
  994. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  995. begin
  996. LockList;
  997. try
  998. Result := fList[aIndex];
  999. finally
  1000. UnlockList;
  1001. end;
  1002. end;
  1003. function TThreadObjectList<T>.LockList: TObjectList<T>;
  1004. begin
  1005. System.TMonitor.Enter(fLock);
  1006. Result := fList;
  1007. end;
  1008. procedure TThreadObjectList<T>.Remove(const Item: T);
  1009. begin
  1010. RemoveItem(Item, TDirection.FromBeginning);
  1011. end;
  1012. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  1013. begin
  1014. LockList;
  1015. try
  1016. fList.RemoveItem(Item, Direction);
  1017. finally
  1018. UnlockList;
  1019. end;
  1020. end;
  1021. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  1022. begin
  1023. LockList;
  1024. try
  1025. fList[aIndex] := aValue;
  1026. finally
  1027. UnlockList;
  1028. end;
  1029. end;
  1030. procedure TThreadObjectList<T>.UnlockList;
  1031. begin
  1032. System.TMonitor.Exit(fLock);
  1033. end;
  1034. {$ENDIF}
  1035. { TAnonymousThread }
  1036. constructor TAnonymousThread.Create(aProc : TProc; aSynchronize : Boolean);
  1037. begin
  1038. fThread := TAdvThread.Create(aProc,aSynchronize);
  1039. end;
  1040. class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
  1041. begin
  1042. Result := TAnonymousThread.Create(aProc,False);
  1043. end;
  1044. class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread;
  1045. begin
  1046. Result := TAnonymousThread.Create(aProc,True);
  1047. end;
  1048. function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread;
  1049. begin
  1050. Result := Self;
  1051. fThread.OnException(aProc);
  1052. end;
  1053. function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
  1054. begin
  1055. Result := Self;
  1056. fThread.OnTerminate(aProc,False);
  1057. end;
  1058. function TAnonymousThread.OnTerminate_Sync(aProc: TProc): IAnonymousThread;
  1059. begin
  1060. Result := Self;
  1061. fThread.OnTerminate(aProc,True);
  1062. end;
  1063. procedure TAnonymousThread.Start;
  1064. begin
  1065. fThread.Start;
  1066. end;
  1067. { TTask }
  1068. constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
  1069. var
  1070. i : Integer;
  1071. begin
  1072. fTaskStatus := TWorkTaskStatus.wtsPending;
  1073. fCustomFaultPolicy := False;
  1074. fNumRetries := 0;
  1075. fExecuteWithSync := False;
  1076. fTerminateWithSync := False;
  1077. fExceptionWithSync := False;
  1078. fFaultControl := TFaultControl.Create;
  1079. fFaultControl.OnRetry := DoRetry;
  1080. fOwnedParams := aOwnedParams;
  1081. fParamList := TParamList.Create(True);
  1082. for i := Low(aParamArray) to High(aParamArray) do
  1083. begin
  1084. fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams));
  1085. {$IFDEF FPC}
  1086. fParamList[i].Value._AddRef;
  1087. {$ENDIF}
  1088. end;
  1089. fExecuteProc := aTaskProc;
  1090. fEnabled := False;
  1091. end;
  1092. destructor TTask.Destroy;
  1093. begin
  1094. fFaultControl.Free;
  1095. //free passed params
  1096. fParamList.Free;
  1097. if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free;
  1098. inherited;
  1099. end;
  1100. procedure TTask.Disable;
  1101. begin
  1102. fEnabled := False;
  1103. end;
  1104. procedure TTask.DoException(aException : Exception);
  1105. begin
  1106. fTaskStatus := TWorkTaskStatus.wtsException;
  1107. if Assigned(fExceptProc) then fExceptProc(Self,aException)
  1108. else raise aException;
  1109. end;
  1110. procedure TTask.DoExecute;
  1111. begin
  1112. fTaskStatus := TWorkTaskStatus.wtsRunning;
  1113. DoInitialize;
  1114. repeat
  1115. try
  1116. if Assigned(fExecuteProc) then fExecuteProc(Self);
  1117. fTaskStatus := TWorkTaskStatus.wtsDone;
  1118. fFaultControl.SuccessExecution;
  1119. except
  1120. on E : Exception do
  1121. begin
  1122. fTaskStatus := TWorkTaskStatus.wtsException;
  1123. {$IFNDEF FPC}
  1124. {$IF DELPHIRX10_UP}
  1125. fFaultControl.FailedExecution(AcquireExceptionObject as Exception);
  1126. {$ELSE}
  1127. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1128. {$ENDIF}
  1129. {$ELSE}
  1130. fFaultControl.FailedExecution(Exception(AcquireExceptionObject));
  1131. {$ENDIF}
  1132. end;
  1133. end;
  1134. until not fFaultControl.NeedToRetry;
  1135. end;
  1136. procedure TTask.DoInitialize;
  1137. begin
  1138. try
  1139. fFaultControl.Reset;
  1140. if Assigned(fInitializeProc) then fInitializeProc(Self);
  1141. except
  1142. on E : Exception do
  1143. begin
  1144. raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]);
  1145. end;
  1146. end;
  1147. end;
  1148. function TTask.Done: Boolean;
  1149. begin
  1150. Result := not fFaultControl.TaskFailed;
  1151. end;
  1152. function TTask.Failed: Boolean;
  1153. begin
  1154. Result := fFaultControl.TaskFailed;
  1155. end;
  1156. function TTask.CircuitBreaked: Boolean;
  1157. begin
  1158. Result := fFaultControl.CircuitBreaked;
  1159. end;
  1160. function TTask.LastException: Exception;
  1161. begin
  1162. Result := fFaultControl.LastException;
  1163. end;
  1164. function TTask.MaxRetries: Integer;
  1165. begin
  1166. Result := fFaultControl.MaxRetries;
  1167. end;
  1168. function TTask.NumRetries: Integer;
  1169. begin
  1170. Result := fFaultControl.NumRetries;
  1171. end;
  1172. procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean);
  1173. begin
  1174. vStopRetries := False;
  1175. if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries);
  1176. end;
  1177. procedure TTask.DoTerminate;
  1178. begin
  1179. if Assigned(fTerminateProc) then fTerminateProc(Self);
  1180. end;
  1181. procedure TTask.Enable;
  1182. begin
  1183. fEnabled := True;
  1184. end;
  1185. function TTask.GetIdTask: Int64;
  1186. begin
  1187. Result := fIdTask;
  1188. end;
  1189. procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy);
  1190. begin
  1191. {$IFDEF FPC}
  1192. if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create;
  1193. {$ENDIF}
  1194. fFaultControl.MaxRetries := aFaultPolicy.MaxRetries;
  1195. fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries;
  1196. fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor;
  1197. end;
  1198. procedure TTask.SetIdTask(Value : Int64);
  1199. begin
  1200. fIdTask := Value;
  1201. end;
  1202. function TTask.GetNumWorker: Integer;
  1203. begin
  1204. Result := fNumWorker;
  1205. end;
  1206. function TTask.GetParam(aIndex: Integer): TFlexValue;
  1207. begin
  1208. Result := fParamList[aIndex].Value;
  1209. end;
  1210. function TTask.GetParam(const aName: string): TFlexValue;
  1211. var
  1212. paramvalue : TParamValue;
  1213. begin
  1214. for paramvalue in fParamList do
  1215. begin
  1216. if CompareText(paramvalue.Name,aName) = 0 then
  1217. begin
  1218. Exit(paramvalue.Value)
  1219. end;
  1220. end;
  1221. //if not exists
  1222. raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]);
  1223. end;
  1224. function TTask.GetParam2(aIndex: Integer): PFlexValue;
  1225. begin
  1226. Result := @fParamList[aIndex].Value;
  1227. end;
  1228. function TTask.GetResult: TFlexValue;
  1229. begin
  1230. Result := fResult;
  1231. end;
  1232. function TTask.IsEnabled: Boolean;
  1233. begin
  1234. Result := fEnabled;
  1235. end;
  1236. procedure TTask.SetNumWorker(Value: Integer);
  1237. begin
  1238. fTaskStatus := TWorkTaskStatus.wtsAssigned;
  1239. fNumWorker := Value;
  1240. end;
  1241. procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue);
  1242. begin
  1243. if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]);
  1244. fParamList[aIndex].Value := Value;
  1245. end;
  1246. procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean);
  1247. var
  1248. paramvalue : TParamValue;
  1249. begin
  1250. //check if already exists parameter
  1251. for paramvalue in fParamList do
  1252. begin
  1253. if CompareText(paramvalue.Name,aName) = 0 then
  1254. begin
  1255. paramvalue.Value := Value;
  1256. Exit;
  1257. end;
  1258. end;
  1259. //if not exists, create one
  1260. fParamList.Add(TParamValue.Create(aName,Value,aOwned));
  1261. end;
  1262. procedure TTask.SetParam(const aName: string; Value: TFlexValue);
  1263. begin
  1264. SetParam(aName,Value,False);
  1265. end;
  1266. procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double);
  1267. begin
  1268. fFaultControl.MaxRetries := aMaxRetries;
  1269. fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS;
  1270. fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  1271. fCustomFaultPolicy := True;
  1272. end;
  1273. procedure TTask.SetResult(aValue: TFlexValue);
  1274. begin
  1275. fResult := aValue;
  1276. end;
  1277. procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray<Integer>);
  1278. begin
  1279. fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1;
  1280. fFaultControl.WaitTimeBetweenRetriesMS := 0;
  1281. fFaultControl.WaitTimeMultiplierFactor := 1;
  1282. fFaultControl.WaitTimeMSArray := aWaitTimeMSArray;
  1283. fCustomFaultPolicy := True;
  1284. end;
  1285. function TTask.TaskStatus: TWorkTaskStatus;
  1286. begin
  1287. Result := fTaskStatus;
  1288. end;
  1289. { TWorkTask }
  1290. function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
  1291. begin
  1292. fExceptProc := aTaskProc;
  1293. Result := Self;
  1294. end;
  1295. function TWorkTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IWorkTask;
  1296. begin
  1297. fExceptionWithSync := True;
  1298. Result := OnException(aTaskProc);
  1299. end;
  1300. function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask;
  1301. begin
  1302. fInitializeProc := aTaskProc;
  1303. Result := Self;
  1304. end;
  1305. function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask;
  1306. begin
  1307. fRetryProc := aTaskProc;
  1308. Result := Self;
  1309. end;
  1310. function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
  1311. begin
  1312. fTerminateProc := aTaskProc;
  1313. Result := Self;
  1314. end;
  1315. function TWorkTask.OnTerminated_Sync(aTaskProc: TTaskProc): IWorkTask;
  1316. begin
  1317. fTerminateWithSync := True;
  1318. Result := OnTerminated(aTaskProc);
  1319. end;
  1320. procedure TWorkTask.Run;
  1321. begin
  1322. fEnabled := True;
  1323. end;
  1324. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask;
  1325. begin
  1326. Result := Self;
  1327. SetParam(aName,aValue);
  1328. end;
  1329. function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask;
  1330. begin
  1331. Result := Self;
  1332. SetParam(aName,aValue,aOwned);
  1333. end;
  1334. function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask;
  1335. begin
  1336. Result := Self;
  1337. SetRetryPolicy(aMaxRetries,0,1);
  1338. end;
  1339. function TWorkTask.RetryForever: IWorkTask;
  1340. begin
  1341. Result := Self;
  1342. SetRetryPolicy(-1,0,1);
  1343. end;
  1344. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1345. begin
  1346. Result := Self;
  1347. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1348. end;
  1349. function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask;
  1350. begin
  1351. Result := Self;
  1352. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1353. end;
  1354. function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IWorkTask;
  1355. begin
  1356. Result := Self;
  1357. SetRetryPolicy(aWaitTimeArray);
  1358. end;
  1359. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask;
  1360. begin
  1361. Result := Self;
  1362. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1363. end;
  1364. function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask;
  1365. begin
  1366. Result := Self;
  1367. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1368. end;
  1369. { TBackgroundTasks }
  1370. procedure TBackgroundTasks.CancelAll;
  1371. begin
  1372. fTaskQueue.Clear;
  1373. end;
  1374. constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aInitialQueueSize : Integer = 100);
  1375. begin
  1376. fMaxQueue := 0;
  1377. fConcurrentWorkers := aConcurrentWorkers;
  1378. fInsertTimeout := INFINITE;
  1379. fExtractTimeout := 5000;
  1380. fNumPushedTasks := 0;
  1381. fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(aInitialQueueSize,fInsertTimeout,fExtractTimeout);
  1382. end;
  1383. destructor TBackgroundTasks.Destroy;
  1384. begin
  1385. CancelAll;
  1386. fTaskQueue.DoShutDown;
  1387. //while fTaskQueue.QueueSize > 0 do Sleep(0);
  1388. if Assigned(fWorkerPool) then fWorkerPool.Free;
  1389. if Assigned(fTaskQueue) then fTaskQueue.Free;
  1390. inherited;
  1391. end;
  1392. function TBackgroundTasks.GetTaskQueue: Cardinal;
  1393. begin
  1394. Result := fTaskQueue.QueueSize;
  1395. end;
  1396. function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
  1397. begin
  1398. Result := AddTask([],False,aTaskProc);
  1399. end;
  1400. function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
  1401. var
  1402. worktask : IWorkTask;
  1403. begin
  1404. if (fMaxQueue > 0) and (fTaskQueue.QueueSize >= fMaxQueue) then raise ETaskAddError.Create('Max queue reached: Task cannot be added');
  1405. worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1406. Inc(fNumPushedTasks);
  1407. worktask.SetIdTask(fNumPushedTasks);
  1408. if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
  1409. begin
  1410. Result := worktask;
  1411. end;
  1412. end;
  1413. function TBackgroundTasks.AddTask_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  1414. begin
  1415. Result := AddTask(aParamArray,aOwnedParams,aTaskProc);
  1416. TTask(Result).ExecuteWithSync := True;
  1417. end;
  1418. function TBackgroundTasks.AddTask_Sync(aTaskProc: TTaskProc): IWorkTask;
  1419. begin
  1420. Result := AddTask_Sync([],False,aTaskProc);
  1421. end;
  1422. procedure TBackgroundTasks.Start;
  1423. var
  1424. i : Integer;
  1425. worker : TWorker;
  1426. begin
  1427. //create workers
  1428. if fWorkerPool <> nil then fWorkerPool.Free;
  1429. fWorkerPool := TWorkerPool.Create(True);
  1430. for i := 1 to fConcurrentWorkers do
  1431. begin
  1432. worker := TQueueWorker.Create(i,fTaskQueue);
  1433. fWorkerPool.Add(worker);
  1434. worker.Start;
  1435. end;
  1436. end;
  1437. { TWorker }
  1438. constructor TWorker.Create;
  1439. begin
  1440. inherited Create(True);
  1441. fDefaultFaultPolicy := TFaultPolicy.Create;
  1442. fStatus := TWorkerStatus.wsSuspended;
  1443. FreeOnTerminate := False;
  1444. end;
  1445. destructor TWorker.Destroy;
  1446. begin
  1447. if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free;
  1448. inherited;
  1449. end;
  1450. procedure TWorker.SetFaultPolicy(aTask: TTask);
  1451. begin
  1452. if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy);
  1453. end;
  1454. procedure TWorker.Execute;
  1455. begin
  1456. end;
  1457. procedure TWorker.ExecuteTask;
  1458. begin
  1459. fCurrentTask.DoExecute;
  1460. end;
  1461. procedure TWorker.TerminateTask;
  1462. begin
  1463. fCurrentTask.DoTerminate;
  1464. end;
  1465. { TSimpleWorker }
  1466. constructor TSimpleWorker.Create(aTask : ITask; aRunOnce : Boolean = True);
  1467. begin
  1468. inherited Create;
  1469. fRunOnce := aRunOnce;
  1470. fCurrentTask := aTask;
  1471. FreeOnTerminate := True;
  1472. end;
  1473. procedure TSimpleWorker.Execute;
  1474. begin
  1475. fStatus := TWorkerStatus.wsIdle;
  1476. while not Terminated do
  1477. begin
  1478. if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then
  1479. try
  1480. fStatus := TWorkerStatus.wsWorking;
  1481. try
  1482. SetFaultPolicy(TTask(fCurrentTask));
  1483. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1484. else fCurrentTask.DoExecute;
  1485. except
  1486. on E : Exception do
  1487. begin
  1488. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1489. else raise ETaskExecutionError.Create(e.Message);
  1490. end;
  1491. end;
  1492. finally
  1493. fStatus := TWorkerStatus.wsIdle;
  1494. try
  1495. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1496. else fCurrentTask.DoTerminate;
  1497. except
  1498. on E : Exception do if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1499. end;
  1500. if fRunOnce then Terminate;
  1501. end;
  1502. end;
  1503. fStatus := TWorkerStatus.wsSuspended
  1504. end;
  1505. { TQueueWorker }
  1506. constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue);
  1507. begin
  1508. inherited Create;
  1509. fNumWorker := aNumWorker;
  1510. fTaskQueue := aTaskQueue;
  1511. end;
  1512. procedure TQueueWorker.Execute;
  1513. begin
  1514. fStatus := TWorkerStatus.wsIdle;
  1515. while not Terminated do
  1516. begin
  1517. fCurrentTask := fTaskQueue.PopItem;
  1518. if fCurrentTask <> nil then
  1519. try
  1520. fStatus := TWorkerStatus.wsWorking;
  1521. try
  1522. fCurrentIdTask := fCurrentTask.GetIdTask;
  1523. SetFaultPolicy(TTask(fCurrentTask));
  1524. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1525. else fCurrentTask.DoExecute;
  1526. except
  1527. on E : Exception do
  1528. begin
  1529. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1530. else raise ETaskExecutionError.Create(e.Message);
  1531. end;
  1532. end;
  1533. finally
  1534. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1535. else fCurrentTask.DoTerminate;
  1536. fStatus := TWorkerStatus.wsIdle;
  1537. end;
  1538. end;
  1539. fStatus := TWorkerStatus.wsSuspended
  1540. end;
  1541. { TScheduledWorker }
  1542. constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
  1543. begin
  1544. inherited Create;
  1545. {$IFNDEF DELPHILINUX}
  1546. NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask);
  1547. {$ENDIF}
  1548. FreeOnTerminate := True;
  1549. fCurrentTask := aScheduledTask;
  1550. end;
  1551. procedure TScheduledWorker.Execute;
  1552. begin
  1553. fStatus := TWorkerStatus.wsIdle;
  1554. if Assigned(fCurrentTask) then
  1555. begin
  1556. try
  1557. fStatus := TWorkerStatus.wsWorking;
  1558. try
  1559. SetFaultPolicy(TTask(fCurrentTask));
  1560. if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask)
  1561. else fCurrentTask.DoExecute;
  1562. fStatus := TWorkerStatus.wsIdle;
  1563. except
  1564. on E : Exception do
  1565. begin
  1566. if fCurrentTask <> nil then fCurrentTask.DoException(E)
  1567. else raise ETaskExecutionError.Create(e.Message);
  1568. end;
  1569. end;
  1570. finally
  1571. if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask)
  1572. else fCurrentTask.DoTerminate;
  1573. //check if expired
  1574. if (fCurrentTask as IScheduledTask).IsFinished then
  1575. begin
  1576. if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask)
  1577. else (fCurrentTask as IScheduledTask).DoExpire;
  1578. end;
  1579. end;
  1580. end;
  1581. fCurrentTask := nil;
  1582. fStatus := TWorkerStatus.wsSuspended;
  1583. end;
  1584. procedure TScheduledWorker.ExpireTask;
  1585. begin
  1586. (fCurrentTask as IScheduledTask).DoExpire;
  1587. end;
  1588. { TScheduledTasks }
  1589. function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
  1590. begin
  1591. Result := AddTask(aTaskName,[],False,aTaskProc);
  1592. end;
  1593. function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1594. var
  1595. scheduletask : TScheduledTask;
  1596. begin
  1597. scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
  1598. scheduletask.Name := aTaskName;
  1599. Inc(fNumPushedTasks);
  1600. scheduletask.SetIdTask(fNumPushedTasks);
  1601. fTaskList.Add(scheduletask);
  1602. Result := scheduletask;
  1603. end;
  1604. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IScheduledTask;
  1605. begin
  1606. Result := AddTask(aTaskName,aParamArray,aOwnedParams,aTaskProc);
  1607. TTask(Result).ExecuteWithSync := True;
  1608. end;
  1609. function TScheduledTasks.AddTask_Sync(const aTaskName: string; aTaskProc: TTaskProc): IScheduledTask;
  1610. begin
  1611. Result := AddTask_Sync(aTaskName,[],False,aTaskProc);
  1612. end;
  1613. constructor TScheduledTasks.Create;
  1614. begin
  1615. fNumPushedTasks := 0;
  1616. fIsStarted := False;
  1617. fFaultPolicy := TFaultPolicy.Create;
  1618. fTaskList := TScheduledTaskList.Create;
  1619. end;
  1620. destructor TScheduledTasks.Destroy;
  1621. begin
  1622. if Assigned(fScheduler) then
  1623. begin
  1624. fScheduler.Terminate;
  1625. fScheduler.WaitFor;
  1626. fScheduler.Free;
  1627. end;
  1628. if Assigned(fTaskList) then fTaskList.Free;
  1629. if Assigned(fFaultPolicy) then fFaultPolicy.Free;
  1630. inherited;
  1631. end;
  1632. function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
  1633. begin
  1634. Result := fScheduler.Get(aIdTask);
  1635. end;
  1636. function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
  1637. begin
  1638. if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!');
  1639. Result := fScheduler.Get(aTaskName);
  1640. end;
  1641. procedure TScheduledTasks.Start;
  1642. begin
  1643. if fIsStarted then Exit;
  1644. if not Assigned(fScheduler) then
  1645. begin
  1646. fScheduler := TScheduler.Create(fTaskList);
  1647. fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
  1648. end;
  1649. fScheduler.Start;
  1650. fIsStarted := True;
  1651. end;
  1652. procedure TScheduledTasks.Stop;
  1653. begin
  1654. if Assigned(fScheduler) then fScheduler.Terminate;
  1655. fIsStarted := False;
  1656. end;
  1657. { TScheduledTask }
  1658. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask;
  1659. begin
  1660. Result := Self;
  1661. SetParam(aName,aValue);
  1662. end;
  1663. function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask;
  1664. begin
  1665. Result := Self;
  1666. SetParam(aName,aValue);
  1667. end;
  1668. function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
  1669. begin
  1670. Result := Self;
  1671. ClearSchedule;
  1672. fScheduleMode := TScheduleMode.smRunOnce;
  1673. fStartDate := aStartDate;
  1674. fNextExecution := aStartDate;
  1675. end;
  1676. function TScheduledTask.StartInMinutes(aMinutes: Word): IScheduledTask;
  1677. begin
  1678. Result := Self;
  1679. ClearSchedule;
  1680. fScheduleMode := TScheduleMode.smRunOnce;
  1681. fStartDate := IncMinute(Now(),aMinutes);
  1682. fNextExecution := fStartDate;
  1683. end;
  1684. function TScheduledTask.StartInSeconds(aSeconds: Word): IScheduledTask;
  1685. begin
  1686. Result := Self;
  1687. ClearSchedule;
  1688. fScheduleMode := TScheduleMode.smRunOnce;
  1689. fStartDate := IncSecond(Now(),aSeconds);
  1690. fNextExecution := fStartDate;
  1691. end;
  1692. function TScheduledTask.StartNow: IScheduledTask;
  1693. begin
  1694. Result := Self;
  1695. ClearSchedule;
  1696. fScheduleMode := TScheduleMode.smRunOnce;
  1697. fStartDate := Now();
  1698. fNextExecution := fStartDate;
  1699. end;
  1700. function TScheduledTask.StartOnDayChange: IScheduledTask;
  1701. begin
  1702. Result := Self;
  1703. ClearSchedule;
  1704. fScheduleMode := TScheduleMode.smRunOnce;
  1705. fStartDate := ChangeTimeOfADay(Tomorrow(),0,0,0);
  1706. fNextExecution := fStartDate;
  1707. end;
  1708. function TScheduledTask.StartTodayAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1709. begin
  1710. Result := Self;
  1711. ClearSchedule;
  1712. fScheduleMode := TScheduleMode.smRunOnce;
  1713. fStartDate := ChangeTimeOfADay(Now(),aHour,aMinute,aSecond);
  1714. fNextExecution := fStartDate;
  1715. end;
  1716. function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0): IScheduledTask;
  1717. begin
  1718. Result := Self;
  1719. ClearSchedule;
  1720. fScheduleMode := TScheduleMode.smRunOnce;
  1721. fStartDate := ChangeTimeOfADay(Tomorrow(),aHour,aMinute,aSecond);
  1722. fNextExecution := fStartDate;
  1723. end;
  1724. function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask;
  1725. begin
  1726. Result := Self;
  1727. SetRetryPolicy(aMaxRetries,0,1);
  1728. end;
  1729. function TScheduledTask.RetryForever: IScheduledTask;
  1730. begin
  1731. Result := Self;
  1732. SetRetryPolicy(-1,0,1);
  1733. end;
  1734. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1735. begin
  1736. Result := Self;
  1737. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  1738. end;
  1739. function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask;
  1740. begin
  1741. Result := Self;
  1742. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1743. end;
  1744. function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray<Integer>): IScheduledTask;
  1745. begin
  1746. Result := Self;
  1747. SetRetryPolicy(aWaitTimeArray);
  1748. end;
  1749. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask;
  1750. begin
  1751. Result := Self;
  1752. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  1753. end;
  1754. function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask;
  1755. begin
  1756. Result := Self;
  1757. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  1758. end;
  1759. procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
  1760. begin
  1761. if fStartDate = 0.0 then ClearSchedule;
  1762. fScheduleMode := TScheduleMode.smRepeatMode;
  1763. fTimeMeasure := aTimeMeasure;
  1764. fTimeInterval := aInterval;
  1765. if fStartDate < Now() then fStartDate := Now();
  1766. fNextExecution := fStartDate;
  1767. fEnabled := True;
  1768. end;
  1769. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
  1770. begin
  1771. if fStartDate = 0.0 then ClearSchedule;
  1772. fScheduleMode := TScheduleMode.smRepeatMode;
  1773. fTimeMeasure := aTimeMeasure;
  1774. fTimeInterval := aInterval;
  1775. if fStartDate < Now() then fStartDate := Now();
  1776. fExpirationDate := aEndTime;
  1777. fNextExecution := fStartDate;
  1778. fEnabled := True;
  1779. end;
  1780. procedure TScheduledTask.RepeatEveryDay;
  1781. begin
  1782. RepeatEvery(1,tmDays);
  1783. end;
  1784. procedure TScheduledTask.RepeatEveryWeek;
  1785. begin
  1786. RepeatEvery(7,tmDays);
  1787. end;
  1788. procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
  1789. begin
  1790. if fStartDate = 0.0 then ClearSchedule;
  1791. fScheduleMode := TScheduleMode.smRepeatMode;
  1792. fTimeMeasure := aTimeMeasure;
  1793. fTimeInterval := aInterval;
  1794. if fStartDate < Now() then fStartDate := Now();
  1795. fExpirationTimes := aRepeatTimes;
  1796. fNextExecution := fStartDate;
  1797. fEnabled := True;
  1798. end;
  1799. procedure TScheduledTask.RunOnce;
  1800. begin
  1801. fScheduleMode := TScheduleMode.smRunOnce;
  1802. if fStartDate < Now() then fStartDate := Now();
  1803. fNextExecution := fStartDate;
  1804. fEnabled := True;
  1805. end;
  1806. procedure TScheduledTask.Cancel;
  1807. begin
  1808. fFinished := True;
  1809. end;
  1810. function TScheduledTask.CheckSchedule: Boolean;
  1811. begin
  1812. Result := False;
  1813. if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
  1814. if fScheduleMode = TScheduleMode.smRunOnce then
  1815. begin
  1816. //if start date reached
  1817. if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
  1818. begin
  1819. fLastExecution := Now();
  1820. Inc(fExecutionTimes);
  1821. fFinished := True;
  1822. Result := True;
  1823. end;
  1824. end
  1825. else
  1826. begin
  1827. //if next execution reached
  1828. if Now() >= fNextExecution then
  1829. begin
  1830. //check expiration limits
  1831. if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
  1832. ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
  1833. begin
  1834. fFinished := True;
  1835. Exit;
  1836. end;
  1837. //calculate next execution
  1838. case fTimeMeasure of
  1839. tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
  1840. tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
  1841. tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
  1842. tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
  1843. tmMilliseconds : fNextExecution := IncMilliSecond(fNextExecution, fTimeInterval);
  1844. end;
  1845. if Now() > fNextExecution then Result := False //avoid execution if system time was altered
  1846. else
  1847. begin
  1848. fLastExecution := Now();
  1849. Inc(fExecutionTimes);
  1850. Result := True;
  1851. end;
  1852. end;
  1853. end;
  1854. end;
  1855. procedure TScheduledTask.ClearSchedule;
  1856. begin
  1857. inherited;
  1858. fFinished := False;
  1859. fStartDate := 0.0;
  1860. fLastExecution := 0.0;
  1861. fNextExecution := 0.0;
  1862. fExpirationDate := 0.0;
  1863. fScheduleMode := TScheduleMode.smRunOnce;
  1864. fTimeMeasure := TTimeMeasure.tmSeconds;
  1865. fTimeInterval := 0;
  1866. end;
  1867. procedure TScheduledTask.DoExpire;
  1868. begin
  1869. if Assigned(fExpiredProc) then fExpiredProc(Self);
  1870. fEnabled := False;
  1871. end;
  1872. function TScheduledTask.GetCurrentSchedule: TPair<TTimeMeasure, Integer>;
  1873. begin
  1874. Result := TPair<TTimeMeasure, Integer>.Create(fTimeMeasure, fTimeInterval);
  1875. end;
  1876. function TScheduledTask.GetTaskName: string;
  1877. begin
  1878. Result := fName;
  1879. end;
  1880. function TScheduledTask.IsFinished: Boolean;
  1881. begin
  1882. Result := fFinished;
  1883. end;
  1884. function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1885. begin
  1886. fExceptProc := aTaskProc;
  1887. Result := Self;
  1888. end;
  1889. function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): IScheduledTask;
  1890. begin
  1891. Result := OnException(aTaskProc);
  1892. TTask(Result).ExceptionWithSync := True;
  1893. end;
  1894. function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask;
  1895. begin
  1896. fRetryProc := aTaskProc;
  1897. Result := Self;
  1898. end;
  1899. function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
  1900. begin
  1901. fExpiredProc := aTaskProc;
  1902. Result := Self;
  1903. end;
  1904. function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1905. begin
  1906. Result := OnExpired(aTaskProc);
  1907. TScheduledTask(Result).ExpireWithSync := True;
  1908. end;
  1909. function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask;
  1910. begin
  1911. fInitializeProc := aTaskProc;
  1912. Result := Self;
  1913. end;
  1914. function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
  1915. begin
  1916. fTerminateProc := aTaskProc;
  1917. Result := Self;
  1918. end;
  1919. function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask;
  1920. begin
  1921. Result := OnTerminated(aTaskProc);
  1922. TTask(Result).TerminateWithSync := True;
  1923. end;
  1924. { TScheduler }
  1925. constructor TScheduler.Create(aTaskList : TScheduledTaskList);
  1926. begin
  1927. inherited Create(True);
  1928. FreeOnTerminate := False;
  1929. fListLock := TCriticalSection.Create;
  1930. fRemoveTaskAfterExpiration := False;
  1931. fTaskList := aTaskList;
  1932. {$IFDEF FPC}
  1933. fCondVar := TSimpleEvent.Create;
  1934. {$ELSE}
  1935. fCondVar := TSimpleEvent.Create(nil,True,False,'');
  1936. {$ENDIF}
  1937. end;
  1938. destructor TScheduler.Destroy;
  1939. begin
  1940. fCondVar.SetEvent;
  1941. fCondVar.Free;
  1942. fTaskList := nil;
  1943. fListLock.Free;
  1944. inherited;
  1945. end;
  1946. procedure TScheduler.Execute;
  1947. var
  1948. task : IScheduledTask;
  1949. worker : TScheduledWorker;
  1950. numworker : Int64;
  1951. begin
  1952. numworker := 0;
  1953. while not Terminated do
  1954. begin
  1955. fListLock.Enter;
  1956. try
  1957. for task in fTaskList do
  1958. begin
  1959. if (task.IsEnabled) and (not task.IsFinished) then
  1960. begin
  1961. if task.CheckSchedule then
  1962. begin
  1963. Inc(numworker);
  1964. worker := TScheduledWorker.Create(numworker,task);
  1965. worker.Start;
  1966. end;
  1967. end
  1968. else
  1969. begin
  1970. if (not task.IsEnabled) and (fRemoveTaskAfterExpiration) then fTaskList.Remove(task);
  1971. end;
  1972. end;
  1973. task := nil;
  1974. finally
  1975. fListLock.Leave;
  1976. end;
  1977. fCondVar.WaitFor(250);
  1978. end;
  1979. end;
  1980. function TScheduler.Add(aTask: TScheduledTask): Integer;
  1981. begin
  1982. Result := fTaskList.Add(aTask);
  1983. end;
  1984. function TScheduler.Get(aIdTask: Int64): IScheduledTask;
  1985. var
  1986. task : IScheduledTask;
  1987. begin
  1988. fListLock.Enter;
  1989. try
  1990. for task in fTaskList do
  1991. begin
  1992. if task.IdTask = aIdTask then Exit(task);
  1993. end;
  1994. finally
  1995. fListLock.Leave;
  1996. end;
  1997. end;
  1998. function TScheduler.Get(const aTaskName: string): IScheduledTask;
  1999. var
  2000. task : IScheduledTask;
  2001. begin
  2002. fListLock.Enter;
  2003. try
  2004. for task in fTaskList do
  2005. begin
  2006. if CompareText(task.Name,aTaskName) = 0 then Exit(task);
  2007. end;
  2008. finally
  2009. fListLock.Leave;
  2010. end;
  2011. end;
  2012. { TAdvThread }
  2013. constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean);
  2014. begin
  2015. inherited Create(True);
  2016. FreeOnTerminate := True;
  2017. fExecuteWithSync := aSynchronize;
  2018. fExecuteProc := aProc;
  2019. end;
  2020. procedure TAdvThread.DoExecute;
  2021. begin
  2022. try
  2023. if Assigned(fExecuteProc) then fExecuteProc;
  2024. except
  2025. on E : Exception do
  2026. begin
  2027. {$IFNDEF FPC}
  2028. {$IF DELPHIRX10_UP}
  2029. if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception)
  2030. {$ELSE}
  2031. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  2032. {$ENDIF}
  2033. {$ELSE}
  2034. if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject))
  2035. {$ENDIF}
  2036. else raise;
  2037. end;
  2038. end;
  2039. end;
  2040. procedure TAdvThread.CallToTerminate;
  2041. begin
  2042. if Assigned(fTerminateProc) then fTerminateProc;
  2043. end;
  2044. procedure TAdvThread.DoTerminate;
  2045. begin
  2046. if fTerminateWithSync then Synchronize(CallToTerminate)
  2047. else CallToTerminate;
  2048. end;
  2049. procedure TAdvThread.Execute;
  2050. begin
  2051. if fExecuteWithSync then Synchronize(DoExecute)
  2052. else DoExecute;
  2053. end;
  2054. procedure TAdvThread.OnException(aProc: TAnonExceptionProc);
  2055. begin
  2056. fExceptionProc := aProc;
  2057. end;
  2058. procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean);
  2059. begin
  2060. fTerminateWithSync := aSynchronize;
  2061. fTerminateProc := aProc;
  2062. end;
  2063. { TRunTask }
  2064. class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask;
  2065. begin
  2066. Result := Execute([],False,aTaskProc);
  2067. end;
  2068. class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask;
  2069. begin
  2070. Result := Execute_Sync([],False,aTaskProc);
  2071. end;
  2072. class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2073. var
  2074. task : TWorkTask;
  2075. worker : TSimpleWorker;
  2076. begin
  2077. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2078. task.ExecuteWithSync := False;
  2079. Result := task;
  2080. worker := TSimpleWorker.Create(task);
  2081. worker.Start;
  2082. end;
  2083. class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask;
  2084. var
  2085. task : TWorkTask;
  2086. worker : TSimpleWorker;
  2087. begin
  2088. task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
  2089. task.ExecuteWithSync := True;
  2090. Result := task;
  2091. worker := TSimpleWorker.Create(task);
  2092. worker.Start;
  2093. end;
  2094. { TAsyncTask }
  2095. constructor TAsyncTask.Create(aAction : TProc);
  2096. begin
  2097. fProcess := TSimpleThread.Create(aAction,False,True);
  2098. end;
  2099. destructor TAsyncTask.Destroy;
  2100. begin
  2101. inherited;
  2102. end;
  2103. class function TAsyncTask.Run(const aAction : TProc) : IAsyncTask;
  2104. begin
  2105. Result := TAsyncTask.Create(aAction);
  2106. end;
  2107. procedure TAsyncTask.Wait(const aTimeout: Cardinal);
  2108. begin
  2109. if aTimeout = 0 then fProcess.WaitFor
  2110. else fProcess.WaitFor(aTimeout);
  2111. end;
  2112. procedure TAsyncTask.Wait;
  2113. begin
  2114. fProcess.WaitFor;
  2115. end;
  2116. { TAsyncTask<T> }
  2117. {$IFNDEF FPC}
  2118. constructor TAsyncTask<T>.Create(aAction: TFunc<T>);
  2119. begin
  2120. fWaitForResult := False;
  2121. fProcess := TSimpleThread.Create(procedure
  2122. begin
  2123. fTaskResult := aAction();
  2124. end,False,False);
  2125. end;
  2126. destructor TAsyncTask<T>.Destroy;
  2127. begin
  2128. if not fWaitForResult then fProcess.FreeOnTerminate := True;
  2129. inherited;
  2130. end;
  2131. class function TAsyncTask<T>.Run(const aAction: TFunc<T>): IAsyncTask<T>;
  2132. begin
  2133. Result := TAsyncTask<T>.Create(aAction);
  2134. end;
  2135. function TAsyncTask<T>.Result: T;
  2136. begin
  2137. fWaitForResult := True;
  2138. fProcess.WaitFor;
  2139. Result := fTaskResult;
  2140. fProcess.Free;
  2141. end;
  2142. function TAsyncTask<T>.Result(const aTimeout: Cardinal): T;
  2143. begin
  2144. fWaitForResult := True;
  2145. fProcess.WaitFor(aTimeout);
  2146. Result := fTaskResult;
  2147. fProcess.Free;
  2148. end;
  2149. {$ENDIF}
  2150. { TParamValue }
  2151. constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean);
  2152. begin
  2153. inherited Create;
  2154. fName := aName;
  2155. fValue := aValue;
  2156. fOwned := aOwnedValue;
  2157. end;
  2158. constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean);
  2159. begin
  2160. inherited Create;
  2161. fName := aName;
  2162. fValue := aValue;
  2163. fOwned := aOwnedValue;
  2164. end;
  2165. constructor TParamValue.Create;
  2166. begin
  2167. fName := '';
  2168. fOwned := False;
  2169. end;
  2170. destructor TParamValue.Destroy;
  2171. begin
  2172. {$IFDEF FPC}
  2173. fValue._Release;
  2174. {$ENDIF}
  2175. if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free;
  2176. inherited;
  2177. end;
  2178. { TBackgroundWorkers }
  2179. constructor TBackgroundWorkers.Create(aConcurrentWorkers : Integer; aWorkerProc : TTaskProc);
  2180. begin
  2181. fConcurrentWorkers := aConcurrentWorkers;
  2182. fWorkerExecuteProc := aWorkerProc;
  2183. fWorkerPool := TWorkerPool.Create(True);
  2184. end;
  2185. destructor TBackgroundWorkers.Destroy;
  2186. begin
  2187. fWorkerPool.Free;
  2188. inherited;
  2189. end;
  2190. procedure TBackgroundWorkers.Start;
  2191. var
  2192. i : Integer;
  2193. worker : TWorker;
  2194. task : IWorkTask;
  2195. begin
  2196. for i := 1 to fConcurrentWorkers do
  2197. begin
  2198. task := TWorkTask.Create([],False,fWorkerExecuteProc)
  2199. .OnInitialize(fWorkerInitProc)
  2200. .OnRetry(fWorkerRetryProc)
  2201. .OnException(fWorkerExceptionProc)
  2202. .OnTerminated(fWorkerTerminateProc);
  2203. task.NumWorker := i;
  2204. task.Run;
  2205. worker := TSimpleWorker.Create(task,False);
  2206. fWorkerPool.Add(worker);
  2207. worker.Start;
  2208. end;
  2209. end;
  2210. procedure TBackgroundWorkers.Stop;
  2211. var
  2212. worker : TWorker;
  2213. begin
  2214. for worker in fWorkerPool do
  2215. begin
  2216. worker.Terminate;
  2217. worker.WaitFor;
  2218. fWorkerPool.Remove(worker);
  2219. end;
  2220. end;
  2221. function TBackgroundWorkers.OnException(aTaskProc: TTaskExceptionProc): TBackgroundWorkers;
  2222. begin
  2223. Result := Self;
  2224. fWorkerExceptionProc := aTaskProc;
  2225. end;
  2226. function TBackgroundWorkers.OnInitialize(aTaskProc: TTaskProc): TBackgroundWorkers;
  2227. begin
  2228. Result := Self;
  2229. fWorkerInitProc := aTaskProc;
  2230. end;
  2231. function TBackgroundWorkers.OnRetry(aTaskProc: TTaskRetryProc): TBackgroundWorkers;
  2232. begin
  2233. Result := Self;
  2234. fWorkerRetryProc := aTaskProc;
  2235. end;
  2236. function TBackgroundWorkers.OnTerminated(aTaskProc: TTaskProc): TBackgroundWorkers;
  2237. begin
  2238. Result := Self;
  2239. fWorkerTerminateProc := aTaskProc;
  2240. end;
  2241. function TBackgroundWorkers.Retry(aMaxRetries: Integer): TBackgroundWorkers;
  2242. begin
  2243. Result := Self;
  2244. SetRetryPolicy(aMaxRetries,0,1);
  2245. end;
  2246. function TBackgroundWorkers.RetryForever: TBackgroundWorkers;
  2247. begin
  2248. Result := Self;
  2249. SetRetryPolicy(-1,0,1);
  2250. end;
  2251. procedure TBackgroundWorkers.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double);
  2252. begin
  2253. fFaultPolicy.MaxRetries := aMaxRetries;
  2254. fFaultPolicy.WaitTimeBetweenRetries := aWaitTimeBetweenRetriesMS;
  2255. fFaultPolicy.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor;
  2256. end;
  2257. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2258. begin
  2259. Result := Self;
  2260. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  2261. end;
  2262. function TBackgroundWorkers.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2263. begin
  2264. Result := Self;
  2265. SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,1);
  2266. end;
  2267. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): TBackgroundWorkers;
  2268. begin
  2269. Result := Self;
  2270. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,1);
  2271. end;
  2272. function TBackgroundWorkers.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): TBackgroundWorkers;
  2273. begin
  2274. Result := Self;
  2275. SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor);
  2276. end;
  2277. { TSimpleThread }
  2278. constructor TSimpleThread.Create(aProc: TProc; aCreateSuspended, aFreeOnTerminate : Boolean);
  2279. begin
  2280. if not Assigned(aProc) then raise EArgumentNilException.Create('param cannot be nil!');
  2281. {$IFNDEF FPC}
  2282. fTimeoutFlag := TLightweightEvent.Create;
  2283. {$ELSE}
  2284. fTimeoutFlag := TSimpleEvent.Create;
  2285. {$ENDIF}
  2286. fTimeoutFlag.ResetEvent;
  2287. fExecuteProc := aProc;
  2288. inherited Create(aCreateSuspended);
  2289. Self.FreeOnTerminate := aFreeOnTerminate;
  2290. end;
  2291. destructor TSimpleThread.Destroy;
  2292. begin
  2293. if Assigned(fTimeoutFlag) then
  2294. begin
  2295. fTimeoutFlag.Release;
  2296. fTimeoutFlag.Free;
  2297. end;
  2298. inherited;
  2299. end;
  2300. procedure TSimpleThread.Execute;
  2301. begin
  2302. fExecuteProc;
  2303. fTimeoutFlag.SetEvent;
  2304. end;
  2305. function TSimpleThread.WaitFor(const aTimeout: Cardinal) : TWaitResult;
  2306. begin
  2307. Result := fTimeoutFlag.WaitFor(aTimeout);
  2308. if Result = TWaitResult.wrTimeout then raise Exception.Create('Timeout');
  2309. end;
  2310. end.