1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432 |
- { ***************************************************************************
- Copyright (c) 2016-2019 Kike Pérez
- Unit : Quick.Threads
- Description : Thread safe collections
- Author : Kike Pérez
- Version : 1.4
- Created : 09/03/2018
- Modified : 14/01/2019
- This file is part of QuickLib: https://github.com/exilon/QuickLib
- ***************************************************************************
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- *************************************************************************** }
- unit Quick.Threads;
- {$i QuickLib.inc}
- interface
- uses
- Classes,
- //rtti,
- Types,
- SysUtils,
- DateUtils,
- //Quick.Chrono,
- Quick.Value,
- {$IFNDEF FPC}
- System.RTLConsts,
- System.Generics.Collections,
- System.SyncObjs;
- {$ELSE}
- RtlConsts,
- Generics.Collections,
- syncobjs;
- {$ENDIF}
- type
- TThreadedQueueCS<T> = class
- private
- FQueue: array of T;
- FQueueSize, FQueueOffset: Integer;
- FQueueLock: TCriticalSection;
- {$IFDEF FPC}
- FQueueCondVar : TEventObject;
- {$ELSE}
- FQueueCondVar: TConditionVariableCS;
- {$ENDIF}
- FShutDown: Boolean;
- FPushTimeout, FPopTimeout: Cardinal;
- FTotalItemsPushed, FTotalItemsPopped: Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- procedure Clear;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- TThreadedQueueList<T> = class
- private
- fQueue : TQueue<T>;
- fQueueSize : Integer;
- fQueueLock : TCriticalSection;
- {$IFDEF FPC}
- FQueueCondVar : TSimpleEvent;
- {$ELSE}
- FQueueCondVar: TConditionVariableCS;
- {$ENDIF}
- fShutDown : Boolean;
- fPushTimeout : Cardinal;
- fPopTimeout : Cardinal;
- fTotalItemsPushed : Cardinal;
- fTotalItemsPopped : Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- {$IFNDEF FPC}
- TThreadObjectList<T: class> = class(TList<T>)
- private
- fList: TObjectList<T>;
- fLock: TObject;
- fDuplicates: TDuplicates;
- function GetItem(aIndex : Integer) : T;
- procedure SetItem(aIndex : Integer; aValue : T);
- public
- constructor Create(OwnedObjects : Boolean);
- destructor Destroy; override;
- property Items[Index : Integer] : T read GetItem write SetItem ; default;
- procedure Add(const Item: T);
- procedure Clear;
- function LockList: TObjectList<T>;
- procedure Remove(const Item: T); inline;
- procedure RemoveItem(const Item: T; Direction: TDirection);
- procedure UnlockList; inline;
- property Duplicates: TDuplicates read fDuplicates write fDuplicates;
- end;
- {$ENDIF}
- {$IFDEF FPC}
- TProc = procedure of object;
- {$ENDIF}
- IAnonymousThread = interface
- procedure Start;
- function OnTerminate(aProc : TProc) : IAnonymousThread;
- end;
- TAnonymousThread = class(TInterfacedObject,IAnonymousThread)
- private
- fThread : TThread;
- fTerminateProc : TProc;
- constructor Create(aProc : TProc);
- procedure NotifyTerminate(Sender : TObject);
- public
- class function Execute(aProc : TProc) : IAnonymousThread;
- procedure Start;
- function OnTerminate(aProc : TProc) : IAnonymousThread;
- end;
- TParamArray = array of TFlexValue;
- TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException);
- TScheduleMode = (smRunOnce, smRepeatMode);
- TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds);
- ITask = interface
- ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}']
- function GetParam(aIndex : Integer) : TFlexValue;
- function TaskStatus : TWorkTaskStatus;
- function GetNumWorker : Integer;
- procedure SetNumWorker(Value : Integer);
- function GetIdTask : Int64;
- procedure SetIdTask(Value : Int64);
- procedure DoExecute;
- procedure DoException(aException : Exception);
- procedure DoTerminate;
- property Param[index : Integer] : TFlexValue read GetParam;
- property NumWorker : Integer read GetNumWorker write SetNumWorker;
- property IdTask : Int64 read GetIdTask;
- function IsEnabled : Boolean;
- end;
- {$IFNDEF FPC}
- TTaskProc = reference to procedure(task : ITask);
- TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception);
- {$ELSE}
- TTaskProc = procedure(task : ITask) of object;
- TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object;
- {$ENDIF}
- IWorkTask = interface(ITask)
- function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
- function OnTerminated(aTaskProc : TTaskProc) : IWorkTask;
- procedure Run;
- end;
- IScheduledTask = interface(ITask)
- ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}']
- function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask;
- function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask;
- function OnExpired(aTaskProc : TTaskProc) : IScheduledTask;
- function CheckSchedule : Boolean;
- procedure DoExpire;
- function GetTaskName : string;
- function StartAt(aStartDate : TDateTime) : IScheduledTask;
- procedure RunOnce;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
- function IsFinished : Boolean;
- procedure Cancel;
- property Name : string read GetTaskName;
- end;
- TTask = class(TInterfacedObject,ITask)
- private
- fIdTask : Int64;
- fNumWorker : Integer;
- fParamArray : TParamArray;
- fExecuteProc : TTaskProc;
- fExceptProc : TTaskExceptionProc;
- fTerminateProc : TTaskProc;
- fExpiredProc : TTaskProc;
- fTaskStatus : TWorkTaskStatus;
- fOwnedParams : Boolean;
- fEnabled : Boolean;
- function GetParam(aIndex : Integer) : TFlexValue;
- procedure DoExecute;
- procedure DoException(aException : Exception);
- procedure DoTerminate;
- function GetNumWorker : Integer;
- procedure SetNumWorker(Value : Integer);
- function GetIdTask : Int64;
- procedure SetIdTask(Value : Int64);
- public
- constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual;
- destructor Destroy; override;
- property IdTask : Int64 read GetIdTask;
- property OwnedParams : Boolean read fOwnedParams write fOwnedParams;
- function IsEnabled : Boolean;
- function TaskStatus : TWorkTaskStatus;
- end;
- TWorkTask = class(TTask,IWorkTask)
- public
- function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual;
- function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual;
- procedure Run; virtual;
- end;
- TTaskQueue = TThreadedQueueCS<IWorkTask>;
- TScheduledTask = class(TTask,IScheduledTask)
- private
- fName : string;
- fExecutionTimes : Integer;
- fScheduleMode : TScheduleMode;
- fTimeInterval : Integer;
- fTimeMeasure : TTimeMeasure;
- fStartDate : TDateTime;
- fLastExecution : TDateTime;
- fNextExecution : TDateTime;
- fExpirationDate : TDateTime;
- fExpirationTimes : Integer;
- fFinished : Boolean;
- procedure ClearSchedule;
- function CheckSchedule : Boolean;
- procedure DoExpire;
- function GetTaskName : string;
- public
- property Name : string read fName write fName;
- function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual;
- function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual;
- function StartAt(aStartDate : TDateTime) : IScheduledTask;
- procedure RunOnce;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime); overload;
- procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload;
- function IsFinished : Boolean;
- procedure Cancel;
- end;
- TWorkerStatus = (wsIdle, wsWorking, wsSuspended);
- TWorker = class(TThread)
- private
- fNumWorker : Integer;
- fCurrentIdTask : Integer;
- fCurrentTask : IWorkTask;
- fStatus : TWorkerStatus;
- fTaskQueue : TTaskQueue;
- public
- constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
- property NumWorker : Integer read fNumWorker;
- property Status : TWorkerStatus read fStatus;
- procedure Execute; override;
- end;
- TScheduledWorker = class(TWorker)
- private
- fTask : IScheduledTask;
- public
- constructor Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
- procedure Execute; override;
- end;
- TWorkerPool = TObjectList<TWorker>;
- TBackgroundTasks = class
- private
- fMaxQueue : Integer;
- fWorkerPool : TWorkerPool;
- fConcurrentWorkers : Integer;
- fInsertTimeout : Cardinal;
- fExtractTimeout : Cardinal;
- fTaskQueue : TTaskQueue;
- fNumPushedTasks : Int64;
- function GetTaskQueue : Cardinal;
- public
- constructor Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
- destructor Destroy; override;
- property MaxQueue : Integer read fMaxQueue;
- property InsertTimeout : Cardinal read fInsertTimeout write fInsertTimeout;
- property ExtractTimeout : Cardinal read fExtractTimeout write fExtractTimeout;
- property TaskQueued : Cardinal read GetTaskQueue;
- property NumPushedTasks : Int64 read fNumPushedTasks;
- property ConcurrentWorkers : Integer read fConcurrentWorkers write fConcurrentWorkers;
- function AddTask(aTaskProc : TTaskProc) : IWorkTask; overload;
- function AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask; overload;
- procedure Start;
- procedure CancelAll;
- end;
- TScheduledTaskList = TList<IScheduledTask>;
- TScheduler = class(TThread)
- private
- fListLock : TCriticalSection;
- fCondVar : TSimpleEvent;
- fTaskList : TScheduledTaskList;
- fRemoveTaskAfterExpiration : Boolean;
- public
- constructor Create(aTaskList : TScheduledTaskList);
- destructor Destroy; override;
- property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
- procedure Execute; override;
- function Add(aTask : TScheduledTask) : Integer;
- function Get(aIdTask : Int64) : IScheduledTask; overload;
- function Get(const aTaskName : string) : IScheduledTask; overload;
- end;
- TScheduledTasks = class
- private
- fTaskList : TScheduledTaskList;
- fScheduler : TScheduler;
- fNumPushedTasks : Int64;
- fRemoveTaskAfterExpiration : Boolean;
- fIsStarted : Boolean;
- public
- constructor Create;
- destructor Destroy; override;
- property NumPushedTasks : Int64 read fNumPushedTasks;
- property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration;
- property IsStarted : Boolean read fIsStarted;
- function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload;
- function GetTask(aIdTask : Int64) : IScheduledTask; overload;
- function GetTask(const aTaskName : string) : IScheduledTask; overload;
- procedure Start;
- procedure Stop;
- end;
- implementation
- { TThreadedQueueCS<T> }
- procedure TThreadedQueueCS<T>.Clear;
- var
- obj : T;
- begin
- FQueueLock.Enter;
- try
- for obj in FQueue do
- begin
- if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF}
- end;
- SetLength(FQueue,0);
- finally
- FQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- SetLength(FQueue, AQueueDepth);
- FQueueLock := TCriticalSection.Create;
- {$IFDEF FPC}
- FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
- {$ELSE}
- FQueueCondVar := TConditionVariableCS.Create;
- {$ENDIF}
- FPushTimeout := PushTimeout;
- FPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueCS<T>.Destroy;
- begin
- DoShutDown;
- FQueueLock.Free;
- FQueueCondVar.Free;
- inherited;
- end;
- procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
- begin
- FQueueLock.Enter;
- try
- SetLength(FQueue, Length(FQueue) + ADelta);
- finally
- FQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- function TThreadedQueueCS<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
- begin
- {$IFDEF FPC}
- Result := FQueueCondVar.WaitFor(FPopTimeout);
- {$ELSE}
- Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
- {$ENDIF}
- end;
- if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := FQueue[FQueueOffset];
- FQueue[FQueueOffset] := Default(T);
- if FQueueSize = Length(FQueue) then
- begin
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- Dec(FQueueSize);
- Inc(FQueueOffset);
- Inc(FTotalItemsPopped);
- if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
- begin
- {$IFDEF FPC}
- Result := FQueueCondVar.WaitFor(FPushTimeout);
- {$ELSE}
- Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
- {$ENDIF}
- end;
- if FShutDown or (Result <> wrSignaled) then Exit;
- if FQueueSize = 0 then
- begin
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
- Inc(FQueueSize);
- Inc(FTotalItemsPushed);
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- procedure TThreadedQueueCS<T>.DoShutDown;
- begin
- FShutDown := True;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- { TThreadedQueueList<T> }
- constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- fQueue := TQueue<T>.Create;
- fQueue.Capacity := AQueueDepth;
- fQueueSize := 0;
- fQueueLock := TCriticalSection.Create;
- {$IFDEF FPC}
- FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
- {$ELSE}
- fQueueCondVar := TConditionVariableCS.Create;
- {$ENDIF}
- fPushTimeout := PushTimeout;
- fPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueList<T>.Destroy;
- begin
- DoShutDown;
- fQueueLock.Free;
- fQueueCondVar.Free;
- fQueue.Free;
- inherited;
- end;
- procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
- begin
- fQueueLock.Enter;
- try
- fQueue.Capacity := fQueue.Capacity + ADelta;
- finally
- fQueueLock.Leave;
- end;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- function TThreadedQueueList<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- {$IFDEF FPC}
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- //var
- //crono : TChronometer;
- begin
- AItem := Default(T);
- //crono := TChronometer.Create(False);
- try
- Result := wrSignaled;
- //writeln('popitem');
- //crono.Start;
- while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
- begin
- //crono.Start;
- Result := FQueueCondVar.WaitFor(FPopTimeout);
- //crono.Stop;
- //writeln('in: ' + crono.ElapsedTime);
- //if result = twaitresult.wrError then result := twaitresult.wrError;
- end;
- //crono.Stop;
- //writeln('out: ' + crono.ElapsedTime);
- fQueueLock.Enter;
- try
- if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := fQueue.Extract;
- Dec(FQueueSize);
- Inc(fTotalItemsPopped);
- finally
- fQueueLock.Leave;
- end;
- finally
- AQueueSize := fQueueSize;
- end;
- end;
- {$ELSE}
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- fQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
- begin
- Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
- end;
- if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := fQueue.Extract;
- if fQueueSize = fQueue.Count then
- begin
- FQueueCondVar.ReleaseAll;
- end;
- Dec(FQueueSize);
- Inc(fTotalItemsPopped);
- finally
- AQueueSize := fQueueSize;
- fQueueLock.Leave;
- end;
- end;
- {$ENDIF}
- function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- {$IFDEF FPC}
- function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
- //begin
- // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
- //end;
- if fShutDown or (Result <> wrSignaled) then Exit;
- //if fQueueSize = 0 then
- //begin
- // FQueueCondVar.SetEvent;
- //end;
- fQueue.Enqueue(AItem);
- Inc(FQueueSize);
- Inc(fTotalItemsPushed);
- finally
- AQueueSize := fQueueSize;
- FQueueLock.Leave;
- //FQueueCondVar.SetEvent;
- end;
- end;
- {$ELSE}
- function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
- //begin
- // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
- //end;
- if fShutDown or (Result <> wrSignaled) then Exit;
- if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
- fQueue.Enqueue(AItem);
- Inc(FQueueSize);
- Inc(fTotalItemsPushed);
- finally
- AQueueSize := fQueueSize;
- FQueueLock.Leave;
- end;
- end;
- {$ENDIF}
- procedure TThreadedQueueList<T>.DoShutDown;
- begin
- fShutDown := True;
- {$IFDEF FPC}
- FQueueCondVar.SetEvent;
- {$ELSE}
- FQueueCondVar.ReleaseAll;
- {$ENDIF}
- end;
- {$IFNDEF FPC}
- { TThreadObjectList<T> }
- procedure TThreadObjectList<T>.Add(const Item: T);
- begin
- LockList;
- try
- if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
- else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.Clear;
- begin
- LockList;
- try
- fList.Clear;
- finally
- UnlockList;
- end;
- end;
- constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
- begin
- inherited Create;
- fLock := TObject.Create;
- fList := TObjectList<T>.Create;
- fDuplicates := dupIgnore;
- end;
- destructor TThreadObjectList<T>.Destroy;
- begin
- LockList;
- try
- fList.Free;
- inherited Destroy;
- finally
- UnlockList;
- fLock.Free;
- end;
- end;
- function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
- begin
- LockList;
- try
- Result := fList[aIndex];
- finally
- UnlockList;
- end;
- end;
- function TThreadObjectList<T>.LockList: TObjectList<T>;
- begin
- System.TMonitor.Enter(fLock);
- Result := fList;
- end;
- procedure TThreadObjectList<T>.Remove(const Item: T);
- begin
- RemoveItem(Item, TDirection.FromBeginning);
- end;
- procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
- begin
- LockList;
- try
- fList.RemoveItem(Item, Direction);
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
- begin
- LockList;
- try
- fList[aIndex] := aValue;
- finally
- UnlockList;
- end;
- end;
- procedure TThreadObjectList<T>.UnlockList;
- begin
- System.TMonitor.Exit(fLock);
- end;
- {$ENDIF}
- { TAnonymousThread }
- constructor TAnonymousThread.Create(aProc : TProc);
- begin
- {$IFNDEF FPC}
- fThread := TThread.CreateAnonymousThread(aProc);
- {$ELSE}
- fThread := TThread.CreateAnonymousThread(@aProc);
- {$ENDIF}
- end;
- class function TAnonymousThread.Execute(aProc: TProc): IAnonymousThread;
- begin
- Result := TAnonymousThread.Create(aProc);
- end;
- procedure TAnonymousThread.NotifyTerminate(Sender: TObject);
- begin
- fTerminateProc;
- end;
- function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread;
- begin
- Result := Self;
- fTerminateProc := aProc;
- fThread.OnTerminate := Self.NotifyTerminate;
- end;
- procedure TAnonymousThread.Start;
- begin
- fThread.Start;
- end;
- { TTask }
- constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc);
- var
- i : Integer;
- begin
- fTaskStatus := TWorkTaskStatus.wtsPending;
- fOwnedParams := aOwnedParams;
- SetLength(fParamArray,High(aParamArray)+1);
- for i := Low(aParamArray) to High(aParamArray) do
- begin
- fParamArray[i].Create(aParamArray[i]);
- {$IFDEF FPC}
- fParamArray[i]._AddRef;
- {$ENDIF}
- end;
- fExecuteProc := aTaskProc;
- fEnabled := False;
- end;
- destructor TTask.Destroy;
- var
- i : Integer;
- begin
- //free passed params
- if fOwnedParams then
- begin
- for i := Low(fParamArray) to High(fParamArray) do
- begin
- {$IFDEF FPC}
- fParamArray[i]._Release;
- {$ENDIF}
- if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free;
- end;
- end;
- fParamArray := nil;
- inherited;
- end;
- procedure TTask.DoException(aException : Exception);
- begin
- fTaskStatus := TWorkTaskStatus.wtsException;
- if Assigned(fExceptProc) then fExceptProc(Self,aException)
- else raise aException;
- end;
- procedure TTask.DoExecute;
- begin
- fTaskStatus := TWorkTaskStatus.wtsRunning;
- if Assigned(fExecuteProc) then fExecuteProc(Self);
- fTaskStatus := TWorkTaskStatus.wtsDone;
- end;
- procedure TTask.DoTerminate;
- begin
- if Assigned(fTerminateProc) then fTerminateProc(Self);
- end;
- function TTask.GetIdTask: Int64;
- begin
- Result := fIdTask;
- end;
- procedure TTask.SetIdTask(Value : Int64);
- begin
- fIdTask := Value;
- end;
- function TTask.GetNumWorker: Integer;
- begin
- Result := fNumWorker;
- end;
- function TTask.GetParam(aIndex: Integer): TFlexValue;
- begin
- Result := fParamArray[aIndex];
- end;
- function TTask.IsEnabled: Boolean;
- begin
- Result := fEnabled;
- end;
- procedure TTask.SetNumWorker(Value: Integer);
- begin
- fTaskStatus := TWorkTaskStatus.wtsAssigned;
- fNumWorker := Value;
- end;
- function TTask.TaskStatus: TWorkTaskStatus;
- begin
- Result := fTaskStatus;
- end;
- { TWorkTask }
- function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask;
- begin
- fExceptProc := aTaskProc;
- Result := Self;
- end;
- function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask;
- begin
- fTerminateProc := aTaskProc;
- Result := Self;
- end;
- procedure TWorkTask.Run;
- begin
- fEnabled := True;
- end;
- { TBackgroundTasks }
- procedure TBackgroundTasks.CancelAll;
- begin
- fTaskQueue.Clear;
- end;
- constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : Integer = 100);
- begin
- fMaxQueue := aMaxQueue;
- fConcurrentWorkers := aConcurrentWorkers;
- fInsertTimeout := INFINITE;
- fExtractTimeout := 5000;
- fNumPushedTasks := 0;
- fTaskQueue := TThreadedQueueCS<IWorkTask>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
- end;
- destructor TBackgroundTasks.Destroy;
- begin
- if Assigned(fWorkerPool) then fWorkerPool.Free;
- if Assigned(fTaskQueue) then fTaskQueue.Free;
- inherited;
- end;
- function TBackgroundTasks.GetTaskQueue: Cardinal;
- begin
- Result := fTaskQueue.QueueSize;
- end;
- function TBackgroundTasks.AddTask(aTaskProc : TTaskProc) : IWorkTask;
- begin
- Result := AddTask([],False,aTaskProc);
- end;
- function TBackgroundTasks.AddTask(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IWorkTask;
- var
- worktask : IWorkTask;
- begin
- worktask := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc);
- Inc(fNumPushedTasks);
- worktask.SetIdTask(fNumPushedTasks);
- if fTaskQueue.PushItem(worktask) = TWaitResult.wrSignaled then
- begin
- Result := worktask;
- end;
- end;
- procedure TBackgroundTasks.Start;
- var
- i : Integer;
- worker : TWorker;
- begin
- //create workers
- if fWorkerPool <> nil then fWorkerPool.Free;
- fWorkerPool := TWorkerPool.Create(True);
- for i := 1 to fConcurrentWorkers do
- begin
- worker := TWorker.Create(i,fTaskQueue);
- fWorkerPool.Add(worker);
- worker.Start;
- end;
- end;
- { TWorker }
- constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue);
- begin
- inherited Create(True);
- fNumWorker := aNumWorker;
- fStatus := TWorkerStatus.wsSuspended;
- fTaskQueue := aTaskQueue;
- FreeOnTerminate := False;
- end;
- procedure TWorker.Execute;
- begin
- fStatus := TWorkerStatus.wsIdle;
- while (not Terminated) and (fTaskQueue.QueueSize > 0) do
- begin
- fCurrentTask := fTaskQueue.PopItem;
- if fCurrentTask <> nil then
- try
- fStatus := TWorkerStatus.wsWorking;
- try
- fCurrentIdTask := fCurrentTask.GetIdTask;
- fCurrentTask.DoExecute;
- except
- on E : Exception do
- begin
- if fCurrentTask <> nil then fCurrentTask.DoException(E)
- else raise Exception.Create(e.Message);
- end;
- end;
- finally
- fCurrentTask.DoTerminate;
- fStatus := TWorkerStatus.wsIdle;
- end;
- end;
- fStatus := TWorkerStatus.wsSuspended
- end;
- { TScheduledTasks }
- function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask;
- begin
- Result := AddTask(aTaskName,[],False,aTaskProc);
- end;
- function TScheduledTasks.AddTask(const aTaskName : string; aParamArray: array of const; aOwnedParams : Boolean; aTaskProc: TTaskProc): IScheduledTask;
- var
- scheduletask : TScheduledTask;
- begin
- scheduletask := TScheduledTask.Create(aParamArray,aOwnedParams,aTaskProc);
- scheduletask.Name := aTaskName;
- Inc(fNumPushedTasks);
- scheduletask.SetIdTask(fNumPushedTasks);
- fTaskList.Add(scheduletask);
- Result := scheduletask;
- end;
- constructor TScheduledTasks.Create;
- begin
- fNumPushedTasks := 0;
- fIsStarted := False;
- fTaskList := TScheduledTaskList.Create;
- end;
- destructor TScheduledTasks.Destroy;
- begin
- if Assigned(fScheduler) then
- begin
- fScheduler.Terminate;
- fScheduler.WaitFor;
- fScheduler.Free;
- end;
- if Assigned(fTaskList) then fTaskList.Free;
- inherited;
- end;
- function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask;
- begin
- Result := fScheduler.Get(aIdTask);
- end;
- function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask;
- begin
- Result := fScheduler.Get(aTaskName);
- end;
- procedure TScheduledTasks.Start;
- begin
- if fIsStarted then Exit;
- if not Assigned(fScheduler) then
- begin
- fScheduler := TScheduler.Create(fTaskList);
- fScheduler.RemoveTaskAfterExpiration := fRemoveTaskAfterExpiration;
- end;
- fScheduler.Start;
- fIsStarted := True;
- end;
- procedure TScheduledTasks.Stop;
- begin
- if Assigned(fScheduler) then fScheduler.Terminate;
- fIsStarted := False;
- end;
- { TScheduledTask }
- function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask;
- begin
- Result := Self;
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- fStartDate := aStartDate;
- fNextExecution := aStartDate;
- end;
- procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate = 0.0 then fStartDate := Now();
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aEndTime : TDateTime);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate = 0.0 then fStartDate := Now();
- fExpirationDate := aEndTime;
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer);
- begin
- if fStartDate = 0.0 then ClearSchedule;
- fScheduleMode := TScheduleMode.smRepeatMode;
- fTimeMeasure := aTimeMeasure;
- fTimeInterval := aInterval;
- if fStartDate = 0.0 then fStartDate := Now();
- fExpirationTimes := aRepeatTimes;
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.RunOnce;
- begin
- ClearSchedule;
- fScheduleMode := TScheduleMode.smRunOnce;
- if fStartDate = 0.0 then fStartDate := Now();
- fNextExecution := fStartDate;
- fEnabled := True;
- end;
- procedure TScheduledTask.Cancel;
- begin
- fFinished := True;
- end;
- function TScheduledTask.CheckSchedule: Boolean;
- begin
- Result := False;
- if fTaskStatus = TWorkTaskStatus.wtsRunning then Exit;
- if fScheduleMode = TScheduleMode.smRunOnce then
- begin
- //if start date reached
- if (fExecutionTimes = 0) and (Now() >= fNextExecution) then
- begin
- fLastExecution := Now();
- Inc(fExecutionTimes);
- Result := True;
- end;
- end
- else
- begin
- //if next execution reached
- if Now() >= fNextExecution then
- begin
- //check expiration limits
- if ((fExpirationTimes > 0) and (fExecutionTimes > fExpirationTimes)) or
- ((fExpirationDate > 0.0) and (fNextExecution >= fExpirationDate)) then
- begin
- fFinished := True;
- Exit;
- end;
- //calculate next execution
- case fTimeMeasure of
- tmDays : fNextExecution := IncDay(fNextExecution,fTimeInterval);
- tmHours : fNextExecution := IncHour(fNextExecution,fTimeInterval);
- tmMinutes : fNextExecution := IncMinute(fNextExecution,fTimeInterval);
- tmSeconds : fNextExecution := IncSecond(fNextExecution,fTimeInterval);
- end;
- fLastExecution := Now();
- Inc(fExecutionTimes);
- Result := True;
- end;
- end;
- end;
- procedure TScheduledTask.ClearSchedule;
- begin
- inherited;
- fFinished := False;
- fStartDate := 0.0;
- fLastExecution := 0.0;
- fNextExecution := 0.0;
- fExpirationDate := 0.0;
- fScheduleMode := TScheduleMode.smRunOnce;
- fTimeMeasure := TTimeMeasure.tmSeconds;
- fTimeInterval := 0;
- end;
- procedure TScheduledTask.DoExpire;
- begin
- if Assigned(fExpiredProc) then fExpiredProc(Self);
- fEnabled := False;
- end;
- function TScheduledTask.GetTaskName: string;
- begin
- Result := fName;
- end;
- function TScheduledTask.IsFinished: Boolean;
- begin
- Result := fFinished;
- end;
- function TScheduledTask.OnException(aTaskProc: TTaskExceptionProc): IScheduledTask;
- begin
- fExceptProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask;
- begin
- fExpiredProc := aTaskProc;
- Result := Self;
- end;
- function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask;
- begin
- fTerminateProc := aTaskProc;
- Result := Self;
- end;
- { TScheduledWorker }
- constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask);
- begin
- inherited Create(aNumWorker,nil);
- FreeOnTerminate := True;
- fTask := aScheduledTask;
- end;
- procedure TScheduledWorker.Execute;
- begin
- fStatus := TWorkerStatus.wsIdle;
- if Assigned(fTask) then
- begin
- try
- fStatus := TWorkerStatus.wsWorking;
- try
- fTask.DoExecute;
- fStatus := TWorkerStatus.wsIdle;
- except
- on E : Exception do
- begin
- if fTask <> nil then fTask.DoException(E)
- else raise Exception.Create(e.Message);
- end;
- end;
- finally
- fTask.DoTerminate;
- end;
- end;
- fTask := nil;
- fStatus := TWorkerStatus.wsSuspended;
- end;
- { TScheduler }
- constructor TScheduler.Create(aTaskList : TScheduledTaskList);
- begin
- inherited Create(True);
- FreeOnTerminate := False;
- fListLock := TCriticalSection.Create;
- fRemoveTaskAfterExpiration := False;
- fTaskList := aTaskList;
- {$IFDEF FPC}
- fCondVar := TSimpleEvent.Create;
- {$ELSE}
- fCondVar := TSimpleEvent.Create(nil,True,False,'');
- {$ENDIF}
- end;
- destructor TScheduler.Destroy;
- begin
- fCondVar.SetEvent;
- fCondVar.Free;
- fTaskList := nil;
- fListLock.Free;
- inherited;
- end;
- procedure TScheduler.Execute;
- var
- task : IScheduledTask;
- worker : TScheduledWorker;
- numworker : Int64;
- begin
- numworker := 0;
- while not Terminated do
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if not task.IsFinished then
- begin
- if task.CheckSchedule then
- begin
- Inc(numworker);
- worker := TScheduledWorker.Create(numworker,task);
- worker.Start;
- end;
- end
- else
- begin
- if task.IsEnabled then
- begin
- task.DoExpire;
- if fRemoveTaskAfterExpiration then fTaskList.Remove(task);
- end;
- end;
- end;
- task := nil;
- finally
- fListLock.Leave;
- end;
- fCondVar.WaitFor(250);
- end;
- end;
- function TScheduler.Add(aTask: TScheduledTask): Integer;
- begin
- Result := fTaskList.Add(aTask);
- end;
- function TScheduler.Get(aIdTask: Int64): IScheduledTask;
- var
- task : IScheduledTask;
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if task.IdTask = aIdTask then Exit(task);
- end;
- finally
- fListLock.Leave;
- end;
- end;
- function TScheduler.Get(const aTaskName: string): IScheduledTask;
- var
- task : IScheduledTask;
- begin
- fListLock.Enter;
- try
- for task in fTaskList do
- begin
- if CompareText(task.Name,aTaskName) = 0 then Exit(task);
- end;
- finally
- fListLock.Leave;
- end;
- end;
- end.
|