123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- { ***************************************************************************
- Copyright (c) 2016-2018 Kike Pérez
- Unit : Quick.Threads
- Description : Thread safe collections
- Author : Kike Pérez
- Version : 1.0
- Created : 09/03/2018
- Modified : 12/03/2018
- This file is part of QuickLib: https://github.com/exilon/QuickLib
- ***************************************************************************
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- *************************************************************************** }
- unit Quick.Threads;
- interface
- uses
- Classes,
- System.Generics.Collections,
- System.SyncObjs;
- type
- TThreadedQueueCS<T> = class
- private
- FQueue: array of T;
- FQueueSize, FQueueOffset: Integer;
- FQueueLock: TCriticalSection;
- FQueueCondVar: TConditionVariableCS;
- FShutDown: Boolean;
- FPushTimeout, FPopTimeout: Cardinal;
- FTotalItemsPushed, FTotalItemsPopped: Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- TThreadedQueueList<T> = class
- private
- fQueue : TQueue<T>;
- fQueueSize : Integer;
- fQueueLock : TCriticalSection;
- fQueueCondVar: TConditionVariableCS;
- fShutDown : Boolean;
- fPushTimeout : Cardinal;
- fPopTimeout : Cardinal;
- fTotalItemsPushed : Cardinal;
- fTotalItemsPopped : Cardinal;
- public
- constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- destructor Destroy; override;
- procedure Grow(ADelta: Integer);
- function PushItem(const AItem: T): TWaitResult; overload;
- function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
- function PopItem: T; overload;
- function PopItem(var AQueueSize: Integer): T; overload;
- function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
- function PopItem(var AItem: T): TWaitResult; overload;
- procedure DoShutDown;
- property QueueSize: Integer read FQueueSize;
- property ShutDown: Boolean read FShutDown;
- property TotalItemsPushed: Cardinal read FTotalItemsPushed;
- property TotalItemsPopped: Cardinal read FTotalItemsPopped;
- end;
- TThreadTask<T> = class(TThread)
- private
- fMaxQueue : Integer;
- fInsertTimeout : Integer;
- fExtractTimeout : Integer;
- fTaskQueue : TThreadedQueueCS<T>;
- function GetTaskQueue : Cardinal;
- public
- constructor Create;
- destructor Destroy; override;
- property MaxQueue : Integer read fMaxQueue write fMaxQueue;
- property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout;
- property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout;
- property TaskQueue : Cardinal read GetTaskQueue;
- procedure Execute; override;
- function AddTask(Task : T) : Boolean;
- procedure Start;
- end;
- implementation
- { TThreadedQueueCS<T> }
- constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- SetLength(FQueue, AQueueDepth);
- FQueueLock := TCriticalSection.Create;
- FQueueCondVar := TConditionVariableCS.Create;
- FPushTimeout := PushTimeout;
- FPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueCS<T>.Destroy;
- begin
- DoShutDown;
- FQueueLock.Free;
- FQueueCondVar.Free;
- inherited;
- end;
- procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
- begin
- FQueueLock.Enter;
- try
- SetLength(FQueue, Length(FQueue) + ADelta);
- finally
- FQueueLock.Leave;
- end;
- FQueueCondVar.ReleaseAll;
- end;
- function TThreadedQueueCS<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
- begin
- Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
- end;
- if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := FQueue[FQueueOffset];
- FQueue[FQueueOffset] := Default(T);
- if FQueueSize = Length(FQueue) then FQueueCondVar.ReleaseAll;
- Dec(FQueueSize);
- Inc(FQueueOffset);
- Inc(FTotalItemsPopped);
- if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
- begin
- Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
- end;
- if FShutDown or (Result <> wrSignaled) then Exit;
- if FQueueSize = 0 then FQueueCondVar.ReleaseAll;
- FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
- Inc(FQueueSize);
- Inc(FTotalItemsPushed);
- finally
- AQueueSize := FQueueSize;
- FQueueLock.Leave;
- end;
- end;
- procedure TThreadedQueueCS<T>.DoShutDown;
- begin
- FShutDown := True;
- FQueueCondVar.ReleaseAll;
- end;
- { TThreadedQueueList<T> }
- constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
- begin
- inherited Create;
- fQueue := TQueue<T>.Create;
- fQueue.Capacity := AQueueDepth;
- fQueueSize := 0;
- fQueueLock := TCriticalSection.Create;
- fQueueCondVar := TConditionVariableCS.Create;
- fPushTimeout := PushTimeout;
- fPopTimeout := PopTimeout;
- end;
- destructor TThreadedQueueList<T>.Destroy;
- begin
- DoShutDown;
- fQueueLock.Free;
- fQueueCondVar.Free;
- fQueue.Free;
- inherited;
- end;
- procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
- begin
- fQueueLock.Enter;
- try
- fQueue.Capacity := fQueue.Capacity + ADelta;
- finally
- fQueueLock.Leave;
- end;
- fQueueCondVar.ReleaseAll;
- end;
- function TThreadedQueueList<T>.PopItem: T;
- var
- LQueueSize: Integer;
- begin
- PopItem(LQueueSize, Result);
- end;
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
- begin
- AItem := Default(T);
- fQueueLock.Enter;
- try
- Result := wrSignaled;
- while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
- begin
- Result := fQueueCondVar.WaitFor(fQueueLock, fPopTimeout);
- end;
- if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
- AItem := fQueue.Extract;
- if fQueueSize = fQueue.Count then fQueueCondVar.ReleaseAll;
- Dec(FQueueSize);
- Inc(fTotalItemsPopped);
- finally
- AQueueSize := fQueueSize;
- fQueueLock.Leave;
- end;
- end;
- function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PopItem(LQueueSize, AItem);
- end;
- function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
- begin
- PopItem(AQueueSize, Result);
- end;
- function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
- var
- LQueueSize: Integer;
- begin
- Result := PushItem(AItem, LQueueSize);
- end;
- function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
- begin
- FQueueLock.Enter;
- try
- Result := wrSignaled;
- //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
- //begin
- // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
- //end;
- if fShutDown or (Result <> wrSignaled) then Exit;
- if fQueueSize = 0 then fQueueCondVar.ReleaseAll;
- fQueue.Enqueue(AItem);
- Inc(FQueueSize);
- Inc(fTotalItemsPushed);
- finally
- AQueueSize := fQueueSize;
- FQueueLock.Leave;
- end;
- end;
- procedure TThreadedQueueList<T>.DoShutDown;
- begin
- fShutDown := True;
- fQueueCondVar.ReleaseAll;
- end;
- { TThreadTask<T> }
- function TThreadTask<T>.AddTask(Task: T): Boolean;
- begin
- Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled;
- end;
- constructor TThreadTask<T>.Create;
- begin
- inherited Create(True);
- fMaxQueue := 10;
- fInsertTimeout := INFINITE;
- fExtractTimeout := INFINITE;
- end;
- destructor TThreadTask<T>.Destroy;
- begin
- if Assigned(fTaskQueue) then fTaskQueue.Free;
- inherited;
- end;
- procedure TThreadTask<T>.Execute;
- begin
- inherited;
- end;
- function TThreadTask<T>.GetTaskQueue: Cardinal;
- begin
- if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize
- else Result := 0;
- end;
- procedure TThreadTask<T>.Start;
- begin
- fTaskQueue := TThreadedQueueCS<T>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
- end;
- end.
|