{ *************************************************************************** Copyright (c) 2016-2018 Kike Pérez Unit : Quick.Threads Description : Thread safe collections Author : Kike Pérez Version : 1.2 Created : 09/03/2018 Modified : 07/04/2018 This file is part of QuickLib: https://github.com/exilon/QuickLib *************************************************************************** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. *************************************************************************** } unit Quick.Threads; {$i QuickLib.inc} interface uses Classes, Types, //Quick.Chrono, {$IFNDEF FPC} System.RTLConsts, System.Generics.Collections, System.SyncObjs; {$ELSE} RtlConsts, Generics.Collections, syncobjs, SysUtils; {$ENDIF} type TThreadedQueueCS = class private FQueue: array of T; FQueueSize, FQueueOffset: Integer; FQueueLock: TCriticalSection; {$IFDEF FPC} FQueueCondVar : TEventObject; {$ELSE} FQueueCondVar: TConditionVariableCS; {$ENDIF} FShutDown: Boolean; FPushTimeout, FPopTimeout: Cardinal; FTotalItemsPushed, FTotalItemsPopped: Cardinal; public constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE); destructor Destroy; override; procedure Grow(ADelta: Integer); function PushItem(const AItem: T): TWaitResult; overload; function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload; function PopItem: T; overload; function PopItem(var AQueueSize: Integer): T; overload; function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload; function PopItem(var AItem: T): TWaitResult; overload; procedure DoShutDown; property QueueSize: Integer read FQueueSize; property ShutDown: Boolean read FShutDown; property TotalItemsPushed: Cardinal read FTotalItemsPushed; property TotalItemsPopped: Cardinal read FTotalItemsPopped; end; TThreadedQueueList = class private fQueue : TQueue; fQueueSize : Integer; fQueueLock : TCriticalSection; {$IFDEF FPC} FQueueCondVar : TSimpleEvent; {$ELSE} FQueueCondVar: TConditionVariableCS; {$ENDIF} fShutDown : Boolean; fPushTimeout : Cardinal; fPopTimeout : Cardinal; fTotalItemsPushed : Cardinal; fTotalItemsPopped : Cardinal; public constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE); destructor Destroy; override; procedure Grow(ADelta: Integer); function PushItem(const AItem: T): TWaitResult; overload; function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload; function PopItem: T; overload; function PopItem(var AQueueSize: Integer): T; overload; function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload; function PopItem(var AItem: T): TWaitResult; overload; procedure DoShutDown; property QueueSize: Integer read FQueueSize; property ShutDown: Boolean read FShutDown; property TotalItemsPushed: Cardinal read FTotalItemsPushed; property TotalItemsPopped: Cardinal read FTotalItemsPopped; end; TThreadTask = class(TThread) private fMaxQueue : Integer; fInsertTimeout : Integer; fExtractTimeout : Integer; fTaskQueue : TThreadedQueueCS; function GetTaskQueue : Cardinal; public constructor Create; destructor Destroy; override; property MaxQueue : Integer read fMaxQueue write fMaxQueue; property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout; property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout; property TaskQueue : Cardinal read GetTaskQueue; procedure Execute; override; function AddTask(Task : T) : Boolean; procedure Start; end; {$IFNDEF FPC} TThreadObjectList = class(TList) private fList: TObjectList; fLock: TObject; fDuplicates: TDuplicates; function GetItem(aIndex : Integer) : T; procedure SetItem(aIndex : Integer; aValue : T); public constructor Create(OwnedObjects : Boolean); destructor Destroy; override; property Items[Index : Integer] : T read GetItem write SetItem ; default; procedure Add(const Item: T); procedure Clear; function LockList: TObjectList; procedure Remove(const Item: T); inline; procedure RemoveItem(const Item: T; Direction: TDirection); procedure UnlockList; inline; property Duplicates: TDuplicates read fDuplicates write fDuplicates; end; {$ENDIF} implementation { TThreadedQueueCS } constructor TThreadedQueueCS.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE); begin inherited Create; SetLength(FQueue, AQueueDepth); FQueueLock := TCriticalSection.Create; {$IFDEF FPC} FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS'); {$ELSE} FQueueCondVar := TConditionVariableCS.Create; {$ENDIF} FPushTimeout := PushTimeout; FPopTimeout := PopTimeout; end; destructor TThreadedQueueCS.Destroy; begin DoShutDown; FQueueLock.Free; FQueueCondVar.Free; inherited; end; procedure TThreadedQueueCS.Grow(ADelta: Integer); begin FQueueLock.Enter; try SetLength(FQueue, Length(FQueue) + ADelta); finally FQueueLock.Leave; end; {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; function TThreadedQueueCS.PopItem: T; var LQueueSize: Integer; begin PopItem(LQueueSize, Result); end; function TThreadedQueueCS.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; begin AItem := Default(T); FQueueLock.Enter; try Result := wrSignaled; while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do begin {$IFDEF FPC} Result := FQueueCondVar.WaitFor(FPopTimeout); {$ELSE} Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout); {$ENDIF} end; if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit; AItem := FQueue[FQueueOffset]; FQueue[FQueueOffset] := Default(T); if FQueueSize = Length(FQueue) then begin {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; Dec(FQueueSize); Inc(FQueueOffset); Inc(FTotalItemsPopped); if FQueueOffset = Length(FQueue) then FQueueOffset := 0; finally AQueueSize := FQueueSize; FQueueLock.Leave; end; end; function TThreadedQueueCS.PopItem(var AItem: T): TWaitResult; var LQueueSize: Integer; begin Result := PopItem(LQueueSize, AItem); end; function TThreadedQueueCS.PopItem(var AQueueSize: Integer): T; begin PopItem(AQueueSize, Result); end; function TThreadedQueueCS.PushItem(const AItem: T): TWaitResult; var LQueueSize: Integer; begin Result := PushItem(AItem, LQueueSize); end; function TThreadedQueueCS.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; begin FQueueLock.Enter; try Result := wrSignaled; while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do begin {$IFDEF FPC} Result := FQueueCondVar.WaitFor(FPushTimeout); {$ELSE} Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout); {$ENDIF} end; if FShutDown or (Result <> wrSignaled) then Exit; if FQueueSize = 0 then begin {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem; Inc(FQueueSize); Inc(FTotalItemsPushed); finally AQueueSize := FQueueSize; FQueueLock.Leave; end; end; procedure TThreadedQueueCS.DoShutDown; begin FShutDown := True; {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; { TThreadedQueueList } constructor TThreadedQueueList.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE); begin inherited Create; fQueue := TQueue.Create; fQueue.Capacity := AQueueDepth; fQueueSize := 0; fQueueLock := TCriticalSection.Create; {$IFDEF FPC} FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL'); {$ELSE} fQueueCondVar := TConditionVariableCS.Create; {$ENDIF} fPushTimeout := PushTimeout; fPopTimeout := PopTimeout; end; destructor TThreadedQueueList.Destroy; begin DoShutDown; fQueueLock.Free; fQueueCondVar.Free; fQueue.Free; inherited; end; procedure TThreadedQueueList.Grow(ADelta: Integer); begin fQueueLock.Enter; try fQueue.Capacity := fQueue.Capacity + ADelta; finally fQueueLock.Leave; end; {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; function TThreadedQueueList.PopItem: T; var LQueueSize: Integer; begin PopItem(LQueueSize, Result); end; {$IFDEF FPC} function TThreadedQueueList.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; //var //crono : TChronometer; begin AItem := Default(T); //crono := TChronometer.Create(False); try Result := wrSignaled; //writeln('popitem'); //crono.Start; while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do begin //crono.Start; Result := FQueueCondVar.WaitFor(FPopTimeout); //crono.Stop; //writeln('in: ' + crono.ElapsedTime); //if result = twaitresult.wrError then result := twaitresult.wrError; end; //crono.Stop; //writeln('out: ' + crono.ElapsedTime); fQueueLock.Enter; try if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit; AItem := fQueue.Extract; Dec(FQueueSize); Inc(fTotalItemsPopped); finally fQueueLock.Leave; end; finally AQueueSize := fQueueSize; end; end; {$ELSE} function TThreadedQueueList.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; begin AItem := Default(T); fQueueLock.Enter; try Result := wrSignaled; while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do begin Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout); end; if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit; AItem := fQueue.Extract; if fQueueSize = fQueue.Count then begin FQueueCondVar.ReleaseAll; end; Dec(FQueueSize); Inc(fTotalItemsPopped); finally AQueueSize := fQueueSize; fQueueLock.Leave; end; end; {$ENDIF} function TThreadedQueueList.PopItem(var AItem: T): TWaitResult; var LQueueSize: Integer; begin Result := PopItem(LQueueSize, AItem); end; function TThreadedQueueList.PopItem(var AQueueSize: Integer): T; begin PopItem(AQueueSize, Result); end; function TThreadedQueueList.PushItem(const AItem: T): TWaitResult; var LQueueSize: Integer; begin Result := PushItem(AItem, LQueueSize); end; {$IFDEF FPC} function TThreadedQueueList.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; begin FQueueLock.Enter; try Result := wrSignaled; //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do //begin // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout); //end; if fShutDown or (Result <> wrSignaled) then Exit; //if fQueueSize = 0 then //begin // FQueueCondVar.SetEvent; //end; fQueue.Enqueue(AItem); Inc(FQueueSize); Inc(fTotalItemsPushed); finally AQueueSize := fQueueSize; FQueueLock.Leave; //FQueueCondVar.SetEvent; end; end; {$ELSE} function TThreadedQueueList.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; begin FQueueLock.Enter; try Result := wrSignaled; //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do //begin // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout); //end; if fShutDown or (Result <> wrSignaled) then Exit; if fQueueSize = 0 then FQueueCondVar.ReleaseAll; fQueue.Enqueue(AItem); Inc(FQueueSize); Inc(fTotalItemsPushed); finally AQueueSize := fQueueSize; FQueueLock.Leave; end; end; {$ENDIF} procedure TThreadedQueueList.DoShutDown; begin fShutDown := True; {$IFDEF FPC} FQueueCondVar.SetEvent; {$ELSE} FQueueCondVar.ReleaseAll; {$ENDIF} end; { TThreadTask } function TThreadTask.AddTask(Task: T): Boolean; begin Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled; end; constructor TThreadTask.Create; begin inherited Create(True); fMaxQueue := 10; fInsertTimeout := INFINITE; fExtractTimeout := INFINITE; end; destructor TThreadTask.Destroy; begin if Assigned(fTaskQueue) then fTaskQueue.Free; inherited; end; procedure TThreadTask.Execute; begin inherited; end; function TThreadTask.GetTaskQueue: Cardinal; begin if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize else Result := 0; end; procedure TThreadTask.Start; begin fTaskQueue := TThreadedQueueCS.Create(fMaxQueue,fInsertTimeout,fExtractTimeout); end; {$IFNDEF FPC} { TThreadObjectList } procedure TThreadObjectList.Add(const Item: T); begin LockList; try if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item) else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]); finally UnlockList; end; end; procedure TThreadObjectList.Clear; begin LockList; try fList.Clear; finally UnlockList; end; end; constructor TThreadObjectList.Create(OwnedObjects : Boolean); begin inherited Create; fLock := TObject.Create; fList := TObjectList.Create; fDuplicates := dupIgnore; end; destructor TThreadObjectList.Destroy; begin LockList; try fList.Free; inherited Destroy; finally UnlockList; fLock.Free; end; end; function TThreadObjectList.GetItem(aIndex: Integer): T; begin LockList; try Result := fList[aIndex]; finally UnlockList; end; end; function TThreadObjectList.LockList: TObjectList; begin System.TMonitor.Enter(fLock); Result := fList; end; procedure TThreadObjectList.Remove(const Item: T); begin RemoveItem(Item, TDirection.FromBeginning); end; procedure TThreadObjectList.RemoveItem(const Item: T; Direction: TDirection); begin LockList; try fList.RemoveItem(Item, Direction); finally UnlockList; end; end; procedure TThreadObjectList.SetItem(aIndex: Integer; aValue: T); begin LockList; try fList[aIndex] := aValue; finally UnlockList; end; end; procedure TThreadObjectList.UnlockList; begin System.TMonitor.Exit(fLock); end; {$ENDIF} end.