| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890 |
- {
- **********************************************************************
- This file is part of the Free Pascal run time library.
- See the file COPYING.FPC, included in this distribution,
- for details about the license.
- **********************************************************************
- Unit for light weight threads.
- Copyright (C) 2008 Mattias Gaertner [email protected]
- Abstract:
- Light weight threads.
- This unit provides methods to easily run a procedure/method with several
- threads at once.
- }
- unit MTProcs;
- {$mode objfpc}{$H+}
- {$inline on}
- {$ModeSwitch nestedprocvars}
- interface
- uses
- Classes, SysUtils, MTPCPU;
- type
- TProcThreadGroup = class;
- TProcThreadPool = class;
- TProcThread = class;
- { TMultiThreadProcItem }
- TMTPThreadState = (
- mtptsNone,
- mtptsActive,
- mtptsWaitingForIndex,
- mtptsWaitingFailed,
- mtptsInactive,
- mtptsTerminated
- );
- TMultiThreadProcItem = class
- private
- FGroup: TProcThreadGroup;
- FIndex: PtrInt;
- FThread: TProcThread;
- FWaitingForIndexEnd: PtrInt;
- FWaitingForIndexStart: PtrInt;
- fWaitForPool: PRTLEvent;
- FState: TMTPThreadState;
- public
- destructor Destroy; override;
- function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean;
- function WaitForIndex(Index: PtrInt): boolean; inline;
- procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt;
- out BlockStart, BlockEnd: PtrInt); inline;
- property Index: PtrInt read FIndex;
- property Group: TProcThreadGroup read FGroup;
- property WaitingForIndexStart: PtrInt read FWaitingForIndexStart;
- property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd;
- property Thread: TProcThread read FThread;
- end;
- { TProcThread }
- TMTPThreadList = (
- mtptlPool,
- mtptlGroup
- );
- TProcThread = class(TThread)
- private
- FItem: TMultiThreadProcItem;
- FNext, FPrev: array[TMTPThreadList] of TProcThread;
- procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline;
- procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline;
- procedure Terminating(aPool: TProcThreadPool; E: Exception);
- public
- constructor Create;
- destructor Destroy; override;
- procedure Execute; override;
- property Item: TMultiThreadProcItem read FItem;
- end;
- TMTMethod = procedure(Index: PtrInt; Data: Pointer;
- Item: TMultiThreadProcItem) of object;
- TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
- Item: TMultiThreadProcItem);
- TMTNestedProcedure = procedure(Index: PtrInt; Data: Pointer;
- Item: TMultiThreadProcItem) is nested;
- { TProcThreadGroup
- Each task creates a new group of threads.
- A group can either need more threads or it has finished and waits for its
- threads to end.
- The thread that created the group is not in the list FFirstThread. }
- TMTPGroupState = (
- mtpgsNone,
- mtpgsNeedThreads, // the groups waiting for more threads to help
- mtpgsFinishing, // the groups waiting for its threads to finish
- mtpgsException // there was an exception => close asap
- );
- TProcThreadGroup = class
- private
- FEndIndex: PtrInt;
- FException: Exception;
- FFirstRunningIndex: PtrInt;
- FFirstThread: TProcThread;
- FLastRunningIndex: PtrInt;
- FMaxThreads: PtrInt;
- FNext, FPrev: TProcThreadGroup;
- FPool: TProcThreadPool;
- FStarterItem: TMultiThreadProcItem;
- FStartIndex: PtrInt;
- FState: TMTPGroupState;
- FTaskData: Pointer;
- FTaskFrame: Pointer;
- FTaskMethod: TMTMethod;
- FTaskNested: TMTNestedProcedure;
- FTaskProcedure: TMTProcedure;
- FThreadCount: PtrInt;
- procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
- procedure RemoveFromList(var First: TProcThreadGroup); inline;
- function NeedMoreThreads: boolean; inline;
- procedure IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
- procedure AddThread(AThread: TProcThread);
- procedure RemoveThread(AThread: TProcThread); inline;
- procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline;
- procedure IndexComplete(Index: PtrInt);
- procedure WakeThreadsWaitingForIndex;
- function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean;
- procedure EnterExceptionState(E: Exception);
- public
- constructor Create;
- destructor Destroy; override;
- property Pool: TProcThreadPool read FPool;
- property StartIndex: PtrInt read FStartIndex;
- property EndIndex: PtrInt read FEndIndex;
- property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started
- property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
- property TaskData: Pointer read FTaskData;
- property TaskMethod: TMTMethod read FTaskMethod;
- property TaskNested: TMTNestedProcedure read FTaskNested;
- property TaskProcedure: TMTProcedure read FTaskProcedure;
- property TaskFrame: Pointer read FTaskFrame;
- property MaxThreads: PtrInt read FMaxThreads;
- property StarterItem: TMultiThreadProcItem read FStarterItem;
- end;
- { TLightWeightThreadPool
- Group 0 are the inactive threads }
- { TProcThreadPool }
- TProcThreadPool = class
- private
- FMaxThreadCount: PtrInt;
- FThreadCount: PtrInt;
- FFirstInactiveThread: TProcThread;
- FFirstActiveThread: TProcThread;
- FFirstTerminatedThread: TProcThread;
- FFirstGroupNeedThreads: TProcThreadGroup;
- FFirstGroupFinishing: TProcThreadGroup;
- FCritSection: TRTLCriticalSection;
- FDestroying: boolean;
- procedure SetMaxThreadCount(const AValue: PtrInt);
- procedure CleanTerminatedThreads;
- procedure DoParallelIntern(const AMethod: TMTMethod;
- const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
- const AFrame: Pointer; StartIndex, EndIndex: PtrInt;
- Data: Pointer = nil; MaxThreads: PtrInt = 0);
- public
- // for debugging only: the critical section is public:
- procedure EnterPoolCriticalSection; inline;
- procedure LeavePoolCriticalSection; inline;
- public
- constructor Create;
- destructor Destroy; override;
- procedure DoParallel(const AMethod: TMTMethod;
- StartIndex, EndIndex: PtrInt;
- Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
- procedure DoParallel(const AProc: TMTProcedure;
- StartIndex, EndIndex: PtrInt;
- Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
- procedure DoParallelNested(const ANested: TMTNestedProcedure;
- StartIndex, EndIndex: PtrInt;
- Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
- // experimental
- procedure DoParallelLocalProc(const LocalProc: Pointer;
- StartIndex, EndIndex: PtrInt;
- Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
- // utility functions for loops:
- procedure CalcBlockSize(LoopLength: PtrInt;
- out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline;
- public
- property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
- property ThreadCount: PtrInt read FThreadCount;
- end;
- var
- ProcThreadPool: TProcThreadPool = nil;
- threadvar
- CurrentThread: TThread; // TProcThread sets this, you can set this for your own TThreads descendants
- implementation
- { TMultiThreadProcItem }
- destructor TMultiThreadProcItem.Destroy;
- begin
- if fWaitForPool<>nil then begin
- RTLeventdestroy(fWaitForPool);
- fWaitForPool:=nil;
- end;
- inherited Destroy;
- end;
- function TMultiThreadProcItem.WaitForIndexRange(
- StartIndex, EndIndex: PtrInt): boolean;
- var
- aPool: TProcThreadPool;
- begin
- //WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex));
- if (EndIndex>=Index) then exit(false);
- if EndIndex<StartIndex then exit(true);
- if Group=nil then exit(true); // a single threaded group has no group object
- // multi threaded group
- aPool:=Group.Pool;
- if aPool.FDestroying then exit(false); // no more wait allowed
- aPool.EnterPoolCriticalSection;
- try
- if Group.FState=mtpgsException then begin
- //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', Group closing because of error');
- exit(false);
- end;
- if Group.HasFinishedIndex(StartIndex,EndIndex) then begin
- //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', range already finished');
- exit(true);
- end;
- FState:=mtptsWaitingForIndex;
- FWaitingForIndexStart:=StartIndex;
- FWaitingForIndexEnd:=EndIndex;
- if fWaitForPool=nil then
- fWaitForPool:=RTLEventCreate;
- RTLeventResetEvent(fWaitForPool);
- finally
- aPool.LeavePoolCriticalSection;
- end;
- //WriteLn('TLightWeightThreadItem.WaitForIndexRange '+IntToStr(Index)+' waiting ... ');
- RTLeventWaitFor(fWaitForPool);
- Result:=FState=mtptsActive;
- FState:=mtptsActive;
- //WriteLn('TLightWeightThreadItem.WaitForIndexRange END '+IntToStr(Index));
- end;
- function TMultiThreadProcItem.WaitForIndex(Index: PtrInt): boolean; inline;
- begin
- Result:=WaitForIndexRange(Index,Index);
- end;
- procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;
- out BlockStart, BlockEnd: PtrInt);
- begin
- BlockStart:=BlockSize*Index;
- BlockEnd:=BlockStart+BlockSize;
- if LoopLength<BlockEnd then BlockEnd:=LoopLength;
- dec(BlockEnd);
- end;
- { TProcThread }
- procedure TProcThread.AddToList(var First: TProcThread;
- ListType: TMTPThreadList);
- begin
- FNext[ListType]:=First;
- if FNext[ListType]<>nil then
- FNext[ListType].FPrev[ListType]:=Self;
- First:=Self;
- end;
- procedure TProcThread.RemoveFromList(var First: TProcThread;
- ListType: TMTPThreadList);
- begin
- if First=Self then
- First:=FNext[ListType];
- if FNext[ListType]<>nil then
- FNext[ListType].FPrev[ListType]:=FPrev[ListType];
- if FPrev[ListType]<>nil then
- FPrev[ListType].FNext[ListType]:=FNext[ListType];
- FNext[ListType]:=nil;
- FPrev[ListType]:=nil;
- end;
- procedure TProcThread.Terminating(aPool: TProcThreadPool;
- E: Exception);
- begin
- aPool.EnterPoolCriticalSection;
- try
- // remove from group
- if Item.FGroup<>nil then begin
- // an exception occured
- Item.FGroup.EnterExceptionState(E);
- Item.FGroup.RemoveThread(Self);
- Item.FGroup:=nil;
- end;
- // move to pool's terminated threads
- case Item.FState of
- mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
- mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool);
- end;
- AddToList(aPool.FFirstTerminatedThread,mtptlPool);
- Item.FState:=mtptsTerminated;
- finally
- aPool.LeavePoolCriticalSection;
- end;
- end;
- constructor TProcThread.Create;
- begin
- inherited Create(true);
- fItem:=TMultiThreadProcItem.Create;
- fItem.fWaitForPool:=RTLEventCreate;
- fItem.FThread:=Self;
- end;
- destructor TProcThread.Destroy;
- begin
- FreeAndNil(FItem);
- inherited Destroy;
- end;
- procedure TProcThread.Execute;
- var
- aPool: TProcThreadPool;
- Group: TProcThreadGroup;
- ok: Boolean;
- E: Exception;
- begin
- MTProcs.CurrentThread:=Self;
- aPool:=Item.Group.Pool;
- ok:=false;
- try
- repeat
- // work
- Group:=Item.Group;
- Group.Run(Item.Index,Group.TaskData,Item);
- aPool.EnterPoolCriticalSection;
- try
- Group.IndexComplete(Item.Index);
- // find next work
- if Group.LastRunningIndex<Group.EndIndex then begin
- // next index of group
- Group.IncreaseLastRunningIndex(Item);
- end else begin
- // remove from group
- RemoveFromList(Group.FFirstThread,mtptlGroup);
- dec(Group.FThreadCount);
- Item.FGroup:=nil;
- Group:=nil;
- if aPool.FFirstGroupNeedThreads<>nil then begin
- // add to new group
- aPool.FFirstGroupNeedThreads.AddThread(Self);
- Group:=Item.Group;
- end else begin
- // mark inactive
- RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
- AddToList(aPool.FFirstInactiveThread,mtptlPool);
- Item.FState:=mtptsInactive;
- RTLeventResetEvent(Item.fWaitForPool);
- end;
- end;
- finally
- aPool.LeavePoolCriticalSection;
- end;
- // wait for new work
- if Item.FState=mtptsInactive then
- RTLeventWaitFor(Item.fWaitForPool);
- until Item.Group=nil;
- ok:=true;
- except
- // stop the exception and store it
- E:=Exception(AcquireExceptionObject);
- Terminating(aPool,E);
- end;
- if ok then
- Terminating(aPool,nil);
- end;
- { TProcThreadGroup }
- procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup;
- ListType: TMTPGroupState);
- begin
- FNext:=First;
- if FNext<>nil then
- FNext.FPrev:=Self;
- First:=Self;
- FState:=ListType;
- end;
- procedure TProcThreadGroup.RemoveFromList(
- var First: TProcThreadGroup);
- begin
- if First=Self then
- First:=FNext;
- if FNext<>nil then
- FNext.FPrev:=FPrev;
- if FPrev<>nil then
- FPrev.FNext:=FNext;
- FNext:=nil;
- FPrev:=nil;
- FState:=mtpgsNone;
- end;
- function TProcThreadGroup.NeedMoreThreads: boolean;
- begin
- Result:=(FLastRunningIndex<FEndIndex) and (FThreadCount<FMaxThreads)
- and (FState<>mtpgsException);
- end;
- procedure TProcThreadGroup.IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
- begin
- inc(FLastRunningIndex);
- Item.FIndex:=FLastRunningIndex;
- if NeedMoreThreads then exit;
- if FState=mtpgsNeedThreads then begin
- RemoveFromList(Pool.FFirstGroupNeedThreads);
- AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing);
- end;
- end;
- procedure TProcThreadGroup.AddThread(AThread: TProcThread);
- begin
- AThread.Item.FGroup:=Self;
- AThread.AddToList(FFirstThread,mtptlGroup);
- inc(FThreadCount);
- IncreaseLastRunningIndex(AThread.Item);
- end;
- procedure TProcThreadGroup.RemoveThread(AThread: TProcThread);
- begin
- AThread.RemoveFromList(FFirstThread,mtptlGroup);
- dec(FThreadCount);
- end;
- procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
- Item: TMultiThreadProcItem); inline;
- begin
- if Assigned(FTaskFrame) then
- CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item)
- else if Assigned(FTaskProcedure) then
- FTaskProcedure(Index,Data,Item)
- else if Assigned(FTaskNested) then
- FTaskNested(Index,Data,Item)
- else
- FTaskMethod(Index,Data,Item);
- end;
- procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
- var
- AThread: TProcThread;
- NewFirstRunningThread: PtrInt;
- begin
- // update FirstRunningIndex
- NewFirstRunningThread:=FStarterItem.Index;
- AThread:=FFirstThread;
- while AThread<>nil do begin
- if (NewFirstRunningThread>aThread.Item.Index)
- and (aThread.Item.Index<>Index) then
- NewFirstRunningThread:=aThread.Item.Index;
- aThread:=aThread.FNext[mtptlGroup];
- end;
- FFirstRunningIndex:=NewFirstRunningThread;
- // wake up threads (Note: do this even if FFirstRunningIndex has not changed)
- WakeThreadsWaitingForIndex;
- end;
- procedure TProcThreadGroup.WakeThreadsWaitingForIndex;
- var
- aThread: TProcThread;
- begin
- if FState<>mtpgsException then begin
- // wake up waiting threads
- aThread:=FFirstThread;
- while aThread<>nil do begin
- if (aThread.Item.FState=mtptsWaitingForIndex)
- and HasFinishedIndex(aThread.Item.WaitingForIndexStart,
- aThread.Item.WaitingForIndexEnd)
- then begin
- // wake up the thread
- aThread.Item.FState:=mtptsActive;
- RTLeventSetEvent(aThread.Item.fWaitForPool);
- end;
- aThread:=aThread.FNext[mtptlGroup];
- end;
- if (FStarterItem.FState=mtptsWaitingForIndex)
- and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd)
- then begin
- // wake up the starter thread of this group
- FStarterItem.FState:=mtptsActive;
- RTLeventSetEvent(FStarterItem.fWaitForPool);
- end;
- end else begin
- // end group: wake up waiting threads
- aThread:=FFirstThread;
- while aThread<>nil do begin
- if (aThread.Item.FState=mtptsWaitingForIndex)
- then begin
- // end group: wake up the thread
- aThread.Item.FState:=mtptsWaitingFailed;
- RTLeventSetEvent(aThread.Item.fWaitForPool);
- end;
- aThread:=aThread.FNext[mtptlGroup];
- end;
- if (FStarterItem.FState=mtptsWaitingForIndex)
- then begin
- // end group: wake up the starter thread of this group
- FStarterItem.FState:=mtptsWaitingFailed;
- RTLeventSetEvent(FStarterItem.fWaitForPool);
- end;
- end;
- end;
- function TProcThreadGroup.HasFinishedIndex(
- aStartIndex, aEndIndex: PtrInt): boolean;
- var
- AThread: TProcThread;
- begin
- // test the finished range
- if FFirstRunningIndex>aEndIndex then exit(true);
- // test the unfinished range
- if FLastRunningIndex<aEndIndex then exit(false);
- // test the active range
- AThread:=FFirstThread;
- while AThread<>nil do begin
- if (AThread.Item.Index>=aStartIndex)
- and (AThread.Item.Index<=aEndIndex) then
- exit(false);
- AThread:=AThread.FNext[mtptlGroup];
- end;
- if (FStarterItem.Index>=aStartIndex)
- and (FStarterItem.Index<=aEndIndex) then
- exit(false);
- Result:=true;
- end;
- procedure TProcThreadGroup.EnterExceptionState(E: Exception);
- begin
- if FState=mtpgsException then exit;
- case FState of
- mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing);
- mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads);
- end;
- FState:=mtpgsException;
- FException:=E;
- WakeThreadsWaitingForIndex;
- end;
- constructor TProcThreadGroup.Create;
- begin
- FStarterItem:=TMultiThreadProcItem.Create;
- FStarterItem.FGroup:=Self;
- end;
- destructor TProcThreadGroup.Destroy;
- begin
- FreeAndNil(FStarterItem);
- inherited Destroy;
- end;
- { TProcThreadPool }
- procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt);
- begin
- if FMaxThreadCount=AValue then exit;
- if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount');
- FMaxThreadCount:=AValue;
- end;
- procedure TProcThreadPool.CleanTerminatedThreads;
- var
- AThread: TProcThread;
- begin
- while FFirstTerminatedThread<>nil do begin
- AThread:=FFirstTerminatedThread;
- AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool);
- AThread.Free;
- end;
- end;
- constructor TProcThreadPool.Create;
- begin
- FMaxThreadCount:=GetSystemThreadCount;
- if FMaxThreadCount<1 then
- FMaxThreadCount:=1;
- InitCriticalSection(FCritSection);
- end;
- destructor TProcThreadPool.Destroy;
- procedure WakeWaitingStarterItems(Group: TProcThreadGroup);
- begin
- while Group<>nil do begin
- if Group.StarterItem.FState=mtptsWaitingForIndex then begin
- Group.StarterItem.FState:=mtptsWaitingFailed;
- RTLeventSetEvent(Group.StarterItem.fWaitForPool);
- end;
- Group:=Group.FNext;
- end;
- end;
- var
- AThread: TProcThread;
- begin
- FDestroying:=true;
- // wake up all waiting threads
- EnterPoolCriticalSection;
- try
- AThread:=FFirstActiveThread;
- while AThread<>nil do begin
- if aThread.Item.FState=mtptsWaitingForIndex then begin
- aThread.Item.FState:=mtptsWaitingFailed;
- RTLeventSetEvent(AThread.Item.fWaitForPool);
- end;
- AThread:=AThread.FNext[mtptlPool];
- end;
- WakeWaitingStarterItems(FFirstGroupNeedThreads);
- WakeWaitingStarterItems(FFirstGroupFinishing);
- finally
- LeavePoolCriticalSection;
- end;
- // wait for all active threads to become inactive
- while FFirstActiveThread<>nil do
- Sleep(10);
- // wake up all inactive threads (without new work they will terminate)
- EnterPoolCriticalSection;
- try
- AThread:=FFirstInactiveThread;
- while AThread<>nil do begin
- RTLeventSetEvent(AThread.Item.fWaitForPool);
- AThread:=AThread.FNext[mtptlPool];
- end;
- finally
- LeavePoolCriticalSection;
- end;
- // wait for all threads to terminate
- while FFirstInactiveThread<>nil do
- Sleep(10);
- // free threads
- CleanTerminatedThreads;
- DoneCriticalsection(FCritSection);
- inherited Destroy;
- end;
- procedure TProcThreadPool.EnterPoolCriticalSection;
- begin
- EnterCriticalsection(FCritSection);
- end;
- procedure TProcThreadPool.LeavePoolCriticalSection;
- begin
- LeaveCriticalsection(FCritSection);
- end;
- procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
- StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
- begin
- if not Assigned(AMethod) then exit;
- DoParallelIntern(AMethod,nil,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
- end;
- procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
- StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
- begin
- if not Assigned(AProc) then exit;
- DoParallelIntern(nil,AProc,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
- end;
- procedure TProcThreadPool.DoParallelNested(const ANested: TMTNestedProcedure;
- StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
- begin
- if not Assigned(ANested) then exit;
- DoParallelIntern(nil,nil,ANested,nil,StartIndex,EndIndex,Data,MaxThreads);
- end;
- procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer;
- StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
- var
- Frame: Pointer;
- begin
- if not Assigned(LocalProc) then exit;
- Frame:=get_caller_frame(get_frame);
- DoParallelIntern(nil,TMTProcedure(LocalProc),nil,Frame,StartIndex,EndIndex,
- Data,MaxThreads);
- end;
- procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount,
- BlockSize: PtrInt; MinBlockSize: PtrInt);
- begin
- if LoopLength<=0 then begin
- BlockCount:=0;
- BlockSize:=1;
- exit;
- end;
- // split work into equally sized blocks
- BlockCount:=ProcThreadPool.MaxThreadCount;
- BlockSize:=(LoopLength div BlockCount);
- if (BlockSize<MinBlockSize) then BlockSize:=MinBlockSize;
- if BlockSize<1 then BlockSize:=1;
- BlockCount:=((LoopLength-1) div BlockSize)+1;
- end;
- procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
- const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
- const AFrame: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer;
- MaxThreads: PtrInt);
- var
- Group: TProcThreadGroup;
- Index: PtrInt;
- AThread: TProcThread;
- NewThread: Boolean;
- Item: TMultiThreadProcItem;
- HelperThreadException: Exception;
- begin
- if (StartIndex>EndIndex) then exit; // nothing to do
- if FDestroying then raise Exception.Create('Pool destroyed');
- if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then
- MaxThreads:=MaxThreadCount;
- if (StartIndex=EndIndex) or (MaxThreads<=1) then begin
- // single threaded
- Item:=TMultiThreadProcItem.Create;
- try
- for Index:=StartIndex to EndIndex do begin
- Item.FIndex:=Index;
- if Assigned(AFrame) then
- CallLocalProc(AProc,AFrame,Index,Data,Item)
- else if Assigned(AProc) then
- AProc(Index,Data,Item)
- else if Assigned(AMethod) then
- AMethod(Index,Data,Item)
- else
- ANested(Index,Data,Item);
- end;
- finally
- Item.Free;
- end;
- exit;
- end;
- // create a new group
- Group:=TProcThreadGroup.Create;
- Group.FPool:=Self;
- Group.FTaskData:=Data;
- Group.FTaskMethod:=AMethod;
- Group.FTaskProcedure:=AProc;
- Group.FTaskNested:=ANested;
- Group.FTaskFrame:=AFrame;
- Group.FStartIndex:=StartIndex;
- Group.FEndIndex:=EndIndex;
- Group.FFirstRunningIndex:=StartIndex;
- Group.FLastRunningIndex:=StartIndex;
- Group.FMaxThreads:=MaxThreads;
- Group.FThreadCount:=1;
- Group.FStarterItem.FState:=mtptsActive;
- Group.FStarterItem.FIndex:=StartIndex;
- HelperThreadException:=nil;
- try
- // start threads
- EnterPoolCriticalSection;
- try
- Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads);
- while Group.NeedMoreThreads do begin
- AThread:=FFirstInactiveThread;
- NewThread:=false;
- if AThread<>nil then begin
- AThread.RemoveFromList(FFirstInactiveThread,mtptlPool);
- end else if FThreadCount<FMaxThreadCount then begin
- AThread:=TProcThread.Create;
- if Assigned(AThread.FatalException) then
- raise AThread.FatalException;
- NewThread:=true;
- inc(FThreadCount);
- end else begin
- break;
- end;
- // add to Group
- Group.AddThread(AThread);
- // start thread
- AThread.AddToList(FFirstActiveThread,mtptlPool);
- AThread.Item.FState:=mtptsActive;
- if NewThread then
- AThread.Start
- else
- RTLeventSetEvent(AThread.Item.fWaitForPool);
- end;
- finally
- LeavePoolCriticalSection;
- end;
- // run until no more Index left
- Index:=StartIndex;
- repeat
- Group.FStarterItem.FIndex:=Index;
- Group.Run(Index,Data,Group.FStarterItem);
- EnterPoolCriticalSection;
- try
- Group.IndexComplete(Index);
- if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
- then begin
- inc(Group.FLastRunningIndex);
- Index:=Group.FLastRunningIndex;
- end else begin
- Index:=StartIndex;
- end;
- finally
- LeavePoolCriticalSection;
- end;
- until Index=StartIndex;
- finally
- // wait for Group to finish
- if Group.FFirstThread<>nil then begin
- EnterPoolCriticalSection;
- try
- Group.FStarterItem.FState:=mtptsInactive;
- Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex
- // wake threads waiting for starter thread to finish
- if Group.FStarterItem.FState<>mtptsInactive then
- Group.EnterExceptionState(nil)
- else
- Group.WakeThreadsWaitingForIndex;
- finally
- LeavePoolCriticalSection;
- end;
- // waiting with exponential spin lock
- Index:=0;
- while Group.FFirstThread<>nil do begin
- sleep(Index);
- Index:=Index*2+1;
- if Index>30 then Index:=30;
- end;
- end;
- // remove group from pool
- EnterPoolCriticalSection;
- try
- case Group.FState of
- mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads);
- mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing);
- end;
- finally
- LeavePoolCriticalSection;
- end;
- HelperThreadException:=Group.FException;
- Group.Free;
- // free terminated threads (terminated, because of exceptions)
- CleanTerminatedThreads;
- end;
- // if the exception occured in a helper thread raise it now
- if HelperThreadException<>nil then
- raise HelperThreadException;
- end;
- initialization
- ProcThreadPool:=TProcThreadPool.Create;
- CurrentThread:=nil;
- finalization
- ProcThreadPool.Free;
- ProcThreadPool:=nil;
- end.
|