mtprocs.pas 27 KB


  1. {
  2. **********************************************************************
  3. This file is part of the Free Pascal run time library.
  4. See the file COPYING.FPC, included in this distribution,
  5. for details about the license.
  6. **********************************************************************
  7. Unit for light weight threads.
  8. Copyright (C) 2008 Mattias Gaertner [email protected]
  9. Abstract:
  10. Light weight threads.
  11. This unit provides methods to easily run a procedure/method with several
  12. threads at once.
  13. }
  14. unit MTProcs;
  15. {$mode objfpc}{$H+}
  16. {$inline on}
  17. {$ModeSwitch nestedprocvars}
  18. interface
  19. uses
  20. Classes, SysUtils, MTPCPU;
  21. type
  22. TProcThreadGroup = class;
  23. TProcThreadPool = class;
  24. TProcThread = class;
  25. { TMultiThreadProcItem }
  26. TMTPThreadState = (
  27. mtptsNone,
  28. mtptsActive,
  29. mtptsWaitingForIndex,
  30. mtptsWaitingFailed,
  31. mtptsInactive,
  32. mtptsTerminated
  33. );
  34. TMultiThreadProcItem = class
  35. private
  36. FGroup: TProcThreadGroup;
  37. FIndex: PtrInt;
  38. FThread: TProcThread;
  39. FWaitingForIndexEnd: PtrInt;
  40. FWaitingForIndexStart: PtrInt;
  41. fWaitForPool: PRTLEvent;
  42. FState: TMTPThreadState;
  43. public
  44. destructor Destroy; override;
  45. function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean;
  46. function WaitForIndex(Index: PtrInt): boolean; inline;
  47. procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt;
  48. out BlockStart, BlockEnd: PtrInt); inline;
  49. property Index: PtrInt read FIndex;
  50. property Group: TProcThreadGroup read FGroup;
  51. property WaitingForIndexStart: PtrInt read FWaitingForIndexStart;
  52. property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd;
  53. property Thread: TProcThread read FThread;
  54. end;
  55. { TProcThread }
  56. TMTPThreadList = (
  57. mtptlPool,
  58. mtptlGroup
  59. );
  60. TProcThread = class(TThread)
  61. private
  62. FItem: TMultiThreadProcItem;
  63. FNext, FPrev: array[TMTPThreadList] of TProcThread;
  64. procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline;
  65. procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline;
  66. procedure Terminating(aPool: TProcThreadPool; E: Exception);
  67. public
  68. constructor Create;
  69. destructor Destroy; override;
  70. procedure Execute; override;
  71. property Item: TMultiThreadProcItem read FItem;
  72. end;
  73. TMTMethod = procedure(Index: PtrInt; Data: Pointer;
  74. Item: TMultiThreadProcItem) of object;
  75. TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
  76. Item: TMultiThreadProcItem);
  77. TMTNestedProcedure = procedure(Index: PtrInt; Data: Pointer;
  78. Item: TMultiThreadProcItem) is nested;
  79. { TProcThreadGroup
  80. Each task creates a new group of threads.
  81. A group can either need more threads or it has finished and waits for its
  82. threads to end.
  83. The thread that created the group is not in the list FFirstThread. }
  84. TMTPGroupState = (
  85. mtpgsNone,
  86. mtpgsNeedThreads, // the groups waiting for more threads to help
  87. mtpgsFinishing, // the groups waiting for its threads to finish
  88. mtpgsException // there was an exception => close asap
  89. );
  90. TProcThreadGroup = class
  91. private
  92. FEndIndex: PtrInt;
  93. FException: Exception;
  94. FFirstRunningIndex: PtrInt;
  95. FFirstThread: TProcThread;
  96. FLastRunningIndex: PtrInt;
  97. FMaxThreads: PtrInt;
  98. FNext, FPrev: TProcThreadGroup;
  99. FPool: TProcThreadPool;
  100. FStarterItem: TMultiThreadProcItem;
  101. FStartIndex: PtrInt;
  102. FState: TMTPGroupState;
  103. FTaskData: Pointer;
  104. FTaskFrame: Pointer;
  105. FTaskMethod: TMTMethod;
  106. FTaskNested: TMTNestedProcedure;
  107. FTaskProcedure: TMTProcedure;
  108. FThreadCount: PtrInt;
  109. procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
  110. procedure RemoveFromList(var First: TProcThreadGroup); inline;
  111. function NeedMoreThreads: boolean; inline;
  112. procedure IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
  113. procedure AddThread(AThread: TProcThread);
  114. procedure RemoveThread(AThread: TProcThread); inline;
  115. procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline;
  116. procedure IndexComplete(Index: PtrInt);
  117. procedure WakeThreadsWaitingForIndex;
  118. function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean;
  119. procedure EnterExceptionState(E: Exception);
  120. public
  121. constructor Create;
  122. destructor Destroy; override;
  123. property Pool: TProcThreadPool read FPool;
  124. property StartIndex: PtrInt read FStartIndex;
  125. property EndIndex: PtrInt read FEndIndex;
  126. property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started
  127. property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
  128. property TaskData: Pointer read FTaskData;
  129. property TaskMethod: TMTMethod read FTaskMethod;
  130. property TaskNested: TMTNestedProcedure read FTaskNested;
  131. property TaskProcedure: TMTProcedure read FTaskProcedure;
  132. property TaskFrame: Pointer read FTaskFrame;
  133. property MaxThreads: PtrInt read FMaxThreads;
  134. property StarterItem: TMultiThreadProcItem read FStarterItem;
  135. end;
  136. { TLightWeightThreadPool
  137. Group 0 are the inactive threads }
  138. { TProcThreadPool }
  139. TProcThreadPool = class
  140. private
  141. FMaxThreadCount: PtrInt;
  142. FThreadCount: PtrInt;
  143. FFirstInactiveThread: TProcThread;
  144. FFirstActiveThread: TProcThread;
  145. FFirstTerminatedThread: TProcThread;
  146. FFirstGroupNeedThreads: TProcThreadGroup;
  147. FFirstGroupFinishing: TProcThreadGroup;
  148. FCritSection: TRTLCriticalSection;
  149. FDestroying: boolean;
  150. procedure SetMaxThreadCount(const AValue: PtrInt);
  151. procedure CleanTerminatedThreads;
  152. procedure DoParallelIntern(const AMethod: TMTMethod;
  153. const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
  154. const AFrame: Pointer; StartIndex, EndIndex: PtrInt;
  155. Data: Pointer = nil; MaxThreads: PtrInt = 0);
  156. public
  157. // for debugging only: the critical section is public:
  158. procedure EnterPoolCriticalSection; inline;
  159. procedure LeavePoolCriticalSection; inline;
  160. public
  161. constructor Create;
  162. destructor Destroy; override;
  163. procedure DoParallel(const AMethod: TMTMethod;
  164. StartIndex, EndIndex: PtrInt;
  165. Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
  166. procedure DoParallel(const AProc: TMTProcedure;
  167. StartIndex, EndIndex: PtrInt;
  168. Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
  169. procedure DoParallelNested(const ANested: TMTNestedProcedure;
  170. StartIndex, EndIndex: PtrInt;
  171. Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
  172. // experimental
  173. procedure DoParallelLocalProc(const LocalProc: Pointer;
  174. StartIndex, EndIndex: PtrInt;
  175. Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
  176. // utility functions for loops:
  177. procedure CalcBlockSize(LoopLength: PtrInt;
  178. out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline;
  179. public
  180. property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
  181. property ThreadCount: PtrInt read FThreadCount;
  182. end;
  183. var
  184. ProcThreadPool: TProcThreadPool = nil;
  185. threadvar
  186. CurrentThread: TThread; // TProcThread sets this, you can set this for your own TThreads descendants
  187. implementation
  188. { TMultiThreadProcItem }
  189. destructor TMultiThreadProcItem.Destroy;
  190. begin
  191. if fWaitForPool<>nil then begin
  192. RTLeventdestroy(fWaitForPool);
  193. fWaitForPool:=nil;
  194. end;
  195. inherited Destroy;
  196. end;
  197. function TMultiThreadProcItem.WaitForIndexRange(
  198. StartIndex, EndIndex: PtrInt): boolean;
  199. var
  200. aPool: TProcThreadPool;
  201. begin
  202. //WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex));
  203. if (EndIndex>=Index) then exit(false);
  204. if EndIndex<StartIndex then exit(true);
  205. if Group=nil then exit(true); // a single threaded group has no group object
  206. // multi threaded group
  207. aPool:=Group.Pool;
  208. if aPool.FDestroying then exit(false); // no more wait allowed
  209. aPool.EnterPoolCriticalSection;
  210. try
  211. if Group.FState=mtpgsException then begin
  212. //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', Group closing because of error');
  213. exit(false);
  214. end;
  215. if Group.HasFinishedIndex(StartIndex,EndIndex) then begin
  216. //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', range already finished');
  217. exit(true);
  218. end;
  219. FState:=mtptsWaitingForIndex;
  220. FWaitingForIndexStart:=StartIndex;
  221. FWaitingForIndexEnd:=EndIndex;
  222. if fWaitForPool=nil then
  223. fWaitForPool:=RTLEventCreate;
  224. RTLeventResetEvent(fWaitForPool);
  225. finally
  226. aPool.LeavePoolCriticalSection;
  227. end;
  228. //WriteLn('TLightWeightThreadItem.WaitForIndexRange '+IntToStr(Index)+' waiting ... ');
  229. RTLeventWaitFor(fWaitForPool);
  230. Result:=FState=mtptsActive;
  231. FState:=mtptsActive;
  232. //WriteLn('TLightWeightThreadItem.WaitForIndexRange END '+IntToStr(Index));
  233. end;
  234. function TMultiThreadProcItem.WaitForIndex(Index: PtrInt): boolean; inline;
  235. begin
  236. Result:=WaitForIndexRange(Index,Index);
  237. end;
  238. procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;
  239. out BlockStart, BlockEnd: PtrInt);
  240. begin
  241. BlockStart:=BlockSize*Index;
  242. BlockEnd:=BlockStart+BlockSize;
  243. if LoopLength<BlockEnd then BlockEnd:=LoopLength;
  244. dec(BlockEnd);
  245. end;
  246. { TProcThread }
  247. procedure TProcThread.AddToList(var First: TProcThread;
  248. ListType: TMTPThreadList);
  249. begin
  250. FNext[ListType]:=First;
  251. if FNext[ListType]<>nil then
  252. FNext[ListType].FPrev[ListType]:=Self;
  253. First:=Self;
  254. end;
  255. procedure TProcThread.RemoveFromList(var First: TProcThread;
  256. ListType: TMTPThreadList);
  257. begin
  258. if First=Self then
  259. First:=FNext[ListType];
  260. if FNext[ListType]<>nil then
  261. FNext[ListType].FPrev[ListType]:=FPrev[ListType];
  262. if FPrev[ListType]<>nil then
  263. FPrev[ListType].FNext[ListType]:=FNext[ListType];
  264. FNext[ListType]:=nil;
  265. FPrev[ListType]:=nil;
  266. end;
  267. procedure TProcThread.Terminating(aPool: TProcThreadPool;
  268. E: Exception);
  269. begin
  270. aPool.EnterPoolCriticalSection;
  271. try
  272. // remove from group
  273. if Item.FGroup<>nil then begin
  274. // an exception occured
  275. Item.FGroup.EnterExceptionState(E);
  276. Item.FGroup.RemoveThread(Self);
  277. Item.FGroup:=nil;
  278. end;
  279. // move to pool's terminated threads
  280. case Item.FState of
  281. mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
  282. mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool);
  283. end;
  284. AddToList(aPool.FFirstTerminatedThread,mtptlPool);
  285. Item.FState:=mtptsTerminated;
  286. finally
  287. aPool.LeavePoolCriticalSection;
  288. end;
  289. end;
  290. constructor TProcThread.Create;
  291. begin
  292. inherited Create(true);
  293. fItem:=TMultiThreadProcItem.Create;
  294. fItem.fWaitForPool:=RTLEventCreate;
  295. fItem.FThread:=Self;
  296. end;
  297. destructor TProcThread.Destroy;
  298. begin
  299. FreeAndNil(FItem);
  300. inherited Destroy;
  301. end;
  302. procedure TProcThread.Execute;
  303. var
  304. aPool: TProcThreadPool;
  305. Group: TProcThreadGroup;
  306. ok: Boolean;
  307. E: Exception;
  308. begin
  309. MTProcs.CurrentThread:=Self;
  310. aPool:=Item.Group.Pool;
  311. ok:=false;
  312. try
  313. repeat
  314. // work
  315. Group:=Item.Group;
  316. Group.Run(Item.Index,Group.TaskData,Item);
  317. aPool.EnterPoolCriticalSection;
  318. try
  319. Group.IndexComplete(Item.Index);
  320. // find next work
  321. if Group.LastRunningIndex<Group.EndIndex then begin
  322. // next index of group
  323. Group.IncreaseLastRunningIndex(Item);
  324. end else begin
  325. // remove from group
  326. RemoveFromList(Group.FFirstThread,mtptlGroup);
  327. dec(Group.FThreadCount);
  328. Item.FGroup:=nil;
  329. Group:=nil;
  330. if aPool.FFirstGroupNeedThreads<>nil then begin
  331. // add to new group
  332. aPool.FFirstGroupNeedThreads.AddThread(Self);
  333. Group:=Item.Group;
  334. end else begin
  335. // mark inactive
  336. RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
  337. AddToList(aPool.FFirstInactiveThread,mtptlPool);
  338. Item.FState:=mtptsInactive;
  339. RTLeventResetEvent(Item.fWaitForPool);
  340. end;
  341. end;
  342. finally
  343. aPool.LeavePoolCriticalSection;
  344. end;
  345. // wait for new work
  346. if Item.FState=mtptsInactive then
  347. RTLeventWaitFor(Item.fWaitForPool);
  348. until Item.Group=nil;
  349. ok:=true;
  350. except
  351. // stop the exception and store it
  352. E:=Exception(AcquireExceptionObject);
  353. Terminating(aPool,E);
  354. end;
  355. if ok then
  356. Terminating(aPool,nil);
  357. end;
  358. { TProcThreadGroup }
  359. procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup;
  360. ListType: TMTPGroupState);
  361. begin
  362. FNext:=First;
  363. if FNext<>nil then
  364. FNext.FPrev:=Self;
  365. First:=Self;
  366. FState:=ListType;
  367. end;
  368. procedure TProcThreadGroup.RemoveFromList(
  369. var First: TProcThreadGroup);
  370. begin
  371. if First=Self then
  372. First:=FNext;
  373. if FNext<>nil then
  374. FNext.FPrev:=FPrev;
  375. if FPrev<>nil then
  376. FPrev.FNext:=FNext;
  377. FNext:=nil;
  378. FPrev:=nil;
  379. FState:=mtpgsNone;
  380. end;
  381. function TProcThreadGroup.NeedMoreThreads: boolean;
  382. begin
  383. Result:=(FLastRunningIndex<FEndIndex) and (FThreadCount<FMaxThreads)
  384. and (FState<>mtpgsException);
  385. end;
  386. procedure TProcThreadGroup.IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
  387. begin
  388. inc(FLastRunningIndex);
  389. Item.FIndex:=FLastRunningIndex;
  390. if NeedMoreThreads then exit;
  391. if FState=mtpgsNeedThreads then begin
  392. RemoveFromList(Pool.FFirstGroupNeedThreads);
  393. AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing);
  394. end;
  395. end;
  396. procedure TProcThreadGroup.AddThread(AThread: TProcThread);
  397. begin
  398. AThread.Item.FGroup:=Self;
  399. AThread.AddToList(FFirstThread,mtptlGroup);
  400. inc(FThreadCount);
  401. IncreaseLastRunningIndex(AThread.Item);
  402. end;
  403. procedure TProcThreadGroup.RemoveThread(AThread: TProcThread);
  404. begin
  405. AThread.RemoveFromList(FFirstThread,mtptlGroup);
  406. dec(FThreadCount);
  407. end;
  408. procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
  409. Item: TMultiThreadProcItem); inline;
  410. begin
  411. if Assigned(FTaskFrame) then
  412. CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item)
  413. else if Assigned(FTaskProcedure) then
  414. FTaskProcedure(Index,Data,Item)
  415. else if Assigned(FTaskNested) then
  416. FTaskNested(Index,Data,Item)
  417. else
  418. FTaskMethod(Index,Data,Item);
  419. end;
  420. procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
  421. var
  422. AThread: TProcThread;
  423. NewFirstRunningThread: PtrInt;
  424. begin
  425. // update FirstRunningIndex
  426. NewFirstRunningThread:=FStarterItem.Index;
  427. AThread:=FFirstThread;
  428. while AThread<>nil do begin
  429. if (NewFirstRunningThread>aThread.Item.Index)
  430. and (aThread.Item.Index<>Index) then
  431. NewFirstRunningThread:=aThread.Item.Index;
  432. aThread:=aThread.FNext[mtptlGroup];
  433. end;
  434. FFirstRunningIndex:=NewFirstRunningThread;
  435. // wake up threads (Note: do this even if FFirstRunningIndex has not changed)
  436. WakeThreadsWaitingForIndex;
  437. end;
  438. procedure TProcThreadGroup.WakeThreadsWaitingForIndex;
  439. var
  440. aThread: TProcThread;
  441. begin
  442. if FState<>mtpgsException then begin
  443. // wake up waiting threads
  444. aThread:=FFirstThread;
  445. while aThread<>nil do begin
  446. if (aThread.Item.FState=mtptsWaitingForIndex)
  447. and HasFinishedIndex(aThread.Item.WaitingForIndexStart,
  448. aThread.Item.WaitingForIndexEnd)
  449. then begin
  450. // wake up the thread
  451. aThread.Item.FState:=mtptsActive;
  452. RTLeventSetEvent(aThread.Item.fWaitForPool);
  453. end;
  454. aThread:=aThread.FNext[mtptlGroup];
  455. end;
  456. if (FStarterItem.FState=mtptsWaitingForIndex)
  457. and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd)
  458. then begin
  459. // wake up the starter thread of this group
  460. FStarterItem.FState:=mtptsActive;
  461. RTLeventSetEvent(FStarterItem.fWaitForPool);
  462. end;
  463. end else begin
  464. // end group: wake up waiting threads
  465. aThread:=FFirstThread;
  466. while aThread<>nil do begin
  467. if (aThread.Item.FState=mtptsWaitingForIndex)
  468. then begin
  469. // end group: wake up the thread
  470. aThread.Item.FState:=mtptsWaitingFailed;
  471. RTLeventSetEvent(aThread.Item.fWaitForPool);
  472. end;
  473. aThread:=aThread.FNext[mtptlGroup];
  474. end;
  475. if (FStarterItem.FState=mtptsWaitingForIndex)
  476. then begin
  477. // end group: wake up the starter thread of this group
  478. FStarterItem.FState:=mtptsWaitingFailed;
  479. RTLeventSetEvent(FStarterItem.fWaitForPool);
  480. end;
  481. end;
  482. end;
  483. function TProcThreadGroup.HasFinishedIndex(
  484. aStartIndex, aEndIndex: PtrInt): boolean;
  485. var
  486. AThread: TProcThread;
  487. begin
  488. // test the finished range
  489. if FFirstRunningIndex>aEndIndex then exit(true);
  490. // test the unfinished range
  491. if FLastRunningIndex<aEndIndex then exit(false);
  492. // test the active range
  493. AThread:=FFirstThread;
  494. while AThread<>nil do begin
  495. if (AThread.Item.Index>=aStartIndex)
  496. and (AThread.Item.Index<=aEndIndex) then
  497. exit(false);
  498. AThread:=AThread.FNext[mtptlGroup];
  499. end;
  500. if (FStarterItem.Index>=aStartIndex)
  501. and (FStarterItem.Index<=aEndIndex) then
  502. exit(false);
  503. Result:=true;
  504. end;
  505. procedure TProcThreadGroup.EnterExceptionState(E: Exception);
  506. begin
  507. if FState=mtpgsException then exit;
  508. case FState of
  509. mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing);
  510. mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads);
  511. end;
  512. FState:=mtpgsException;
  513. FException:=E;
  514. WakeThreadsWaitingForIndex;
  515. end;
  516. constructor TProcThreadGroup.Create;
  517. begin
  518. FStarterItem:=TMultiThreadProcItem.Create;
  519. FStarterItem.FGroup:=Self;
  520. end;
  521. destructor TProcThreadGroup.Destroy;
  522. begin
  523. FreeAndNil(FStarterItem);
  524. inherited Destroy;
  525. end;
  526. { TProcThreadPool }
  527. procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt);
  528. begin
  529. if FMaxThreadCount=AValue then exit;
  530. if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount');
  531. FMaxThreadCount:=AValue;
  532. end;
  533. procedure TProcThreadPool.CleanTerminatedThreads;
  534. var
  535. AThread: TProcThread;
  536. begin
  537. while FFirstTerminatedThread<>nil do begin
  538. AThread:=FFirstTerminatedThread;
  539. AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool);
  540. AThread.Free;
  541. end;
  542. end;
  543. constructor TProcThreadPool.Create;
  544. begin
  545. FMaxThreadCount:=GetSystemThreadCount;
  546. if FMaxThreadCount<1 then
  547. FMaxThreadCount:=1;
  548. InitCriticalSection(FCritSection);
  549. end;
  550. destructor TProcThreadPool.Destroy;
  551. procedure WakeWaitingStarterItems(Group: TProcThreadGroup);
  552. begin
  553. while Group<>nil do begin
  554. if Group.StarterItem.FState=mtptsWaitingForIndex then begin
  555. Group.StarterItem.FState:=mtptsWaitingFailed;
  556. RTLeventSetEvent(Group.StarterItem.fWaitForPool);
  557. end;
  558. Group:=Group.FNext;
  559. end;
  560. end;
  561. var
  562. AThread: TProcThread;
  563. begin
  564. FDestroying:=true;
  565. // wake up all waiting threads
  566. EnterPoolCriticalSection;
  567. try
  568. AThread:=FFirstActiveThread;
  569. while AThread<>nil do begin
  570. if aThread.Item.FState=mtptsWaitingForIndex then begin
  571. aThread.Item.FState:=mtptsWaitingFailed;
  572. RTLeventSetEvent(AThread.Item.fWaitForPool);
  573. end;
  574. AThread:=AThread.FNext[mtptlPool];
  575. end;
  576. WakeWaitingStarterItems(FFirstGroupNeedThreads);
  577. WakeWaitingStarterItems(FFirstGroupFinishing);
  578. finally
  579. LeavePoolCriticalSection;
  580. end;
  581. // wait for all active threads to become inactive
  582. while FFirstActiveThread<>nil do
  583. Sleep(10);
  584. // wake up all inactive threads (without new work they will terminate)
  585. EnterPoolCriticalSection;
  586. try
  587. AThread:=FFirstInactiveThread;
  588. while AThread<>nil do begin
  589. RTLeventSetEvent(AThread.Item.fWaitForPool);
  590. AThread:=AThread.FNext[mtptlPool];
  591. end;
  592. finally
  593. LeavePoolCriticalSection;
  594. end;
  595. // wait for all threads to terminate
  596. while FFirstInactiveThread<>nil do
  597. Sleep(10);
  598. // free threads
  599. CleanTerminatedThreads;
  600. DoneCriticalsection(FCritSection);
  601. inherited Destroy;
  602. end;
  603. procedure TProcThreadPool.EnterPoolCriticalSection;
  604. begin
  605. EnterCriticalsection(FCritSection);
  606. end;
  607. procedure TProcThreadPool.LeavePoolCriticalSection;
  608. begin
  609. LeaveCriticalsection(FCritSection);
  610. end;
  611. procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
  612. StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
  613. begin
  614. if not Assigned(AMethod) then exit;
  615. DoParallelIntern(AMethod,nil,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
  616. end;
  617. procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
  618. StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
  619. begin
  620. if not Assigned(AProc) then exit;
  621. DoParallelIntern(nil,AProc,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
  622. end;
  623. procedure TProcThreadPool.DoParallelNested(const ANested: TMTNestedProcedure;
  624. StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
  625. begin
  626. if not Assigned(ANested) then exit;
  627. DoParallelIntern(nil,nil,ANested,nil,StartIndex,EndIndex,Data,MaxThreads);
  628. end;
  629. procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer;
  630. StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
  631. var
  632. Frame: Pointer;
  633. begin
  634. if not Assigned(LocalProc) then exit;
  635. Frame:=get_caller_frame(get_frame);
  636. DoParallelIntern(nil,TMTProcedure(LocalProc),nil,Frame,StartIndex,EndIndex,
  637. Data,MaxThreads);
  638. end;
  639. procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount,
  640. BlockSize: PtrInt; MinBlockSize: PtrInt);
  641. begin
  642. if LoopLength<=0 then begin
  643. BlockCount:=0;
  644. BlockSize:=1;
  645. exit;
  646. end;
  647. // split work into equally sized blocks
  648. BlockCount:=ProcThreadPool.MaxThreadCount;
  649. BlockSize:=(LoopLength div BlockCount);
  650. if (BlockSize<MinBlockSize) then BlockSize:=MinBlockSize;
  651. if BlockSize<1 then BlockSize:=1;
  652. BlockCount:=((LoopLength-1) div BlockSize)+1;
  653. end;
  654. procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
  655. const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
  656. const AFrame: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer;
  657. MaxThreads: PtrInt);
  658. var
  659. Group: TProcThreadGroup;
  660. Index: PtrInt;
  661. AThread: TProcThread;
  662. NewThread: Boolean;
  663. Item: TMultiThreadProcItem;
  664. HelperThreadException: Exception;
  665. begin
  666. if (StartIndex>EndIndex) then exit; // nothing to do
  667. if FDestroying then raise Exception.Create('Pool destroyed');
  668. if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then
  669. MaxThreads:=MaxThreadCount;
  670. if (StartIndex=EndIndex) or (MaxThreads<=1) then begin
  671. // single threaded
  672. Item:=TMultiThreadProcItem.Create;
  673. try
  674. for Index:=StartIndex to EndIndex do begin
  675. Item.FIndex:=Index;
  676. if Assigned(AFrame) then
  677. CallLocalProc(AProc,AFrame,Index,Data,Item)
  678. else if Assigned(AProc) then
  679. AProc(Index,Data,Item)
  680. else if Assigned(AMethod) then
  681. AMethod(Index,Data,Item)
  682. else
  683. ANested(Index,Data,Item);
  684. end;
  685. finally
  686. Item.Free;
  687. end;
  688. exit;
  689. end;
  690. // create a new group
  691. Group:=TProcThreadGroup.Create;
  692. Group.FPool:=Self;
  693. Group.FTaskData:=Data;
  694. Group.FTaskMethod:=AMethod;
  695. Group.FTaskProcedure:=AProc;
  696. Group.FTaskNested:=ANested;
  697. Group.FTaskFrame:=AFrame;
  698. Group.FStartIndex:=StartIndex;
  699. Group.FEndIndex:=EndIndex;
  700. Group.FFirstRunningIndex:=StartIndex;
  701. Group.FLastRunningIndex:=StartIndex;
  702. Group.FMaxThreads:=MaxThreads;
  703. Group.FThreadCount:=1;
  704. Group.FStarterItem.FState:=mtptsActive;
  705. Group.FStarterItem.FIndex:=StartIndex;
  706. HelperThreadException:=nil;
  707. try
  708. // start threads
  709. EnterPoolCriticalSection;
  710. try
  711. Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads);
  712. while Group.NeedMoreThreads do begin
  713. AThread:=FFirstInactiveThread;
  714. NewThread:=false;
  715. if AThread<>nil then begin
  716. AThread.RemoveFromList(FFirstInactiveThread,mtptlPool);
  717. end else if FThreadCount<FMaxThreadCount then begin
  718. AThread:=TProcThread.Create;
  719. if Assigned(AThread.FatalException) then
  720. raise AThread.FatalException;
  721. NewThread:=true;
  722. inc(FThreadCount);
  723. end else begin
  724. break;
  725. end;
  726. // add to Group
  727. Group.AddThread(AThread);
  728. // start thread
  729. AThread.AddToList(FFirstActiveThread,mtptlPool);
  730. AThread.Item.FState:=mtptsActive;
  731. if NewThread then
  732. AThread.Start
  733. else
  734. RTLeventSetEvent(AThread.Item.fWaitForPool);
  735. end;
  736. finally
  737. LeavePoolCriticalSection;
  738. end;
  739. // run until no more Index left
  740. Index:=StartIndex;
  741. repeat
  742. Group.FStarterItem.FIndex:=Index;
  743. Group.Run(Index,Data,Group.FStarterItem);
  744. EnterPoolCriticalSection;
  745. try
  746. Group.IndexComplete(Index);
  747. if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
  748. then begin
  749. inc(Group.FLastRunningIndex);
  750. Index:=Group.FLastRunningIndex;
  751. end else begin
  752. Index:=StartIndex;
  753. end;
  754. finally
  755. LeavePoolCriticalSection;
  756. end;
  757. until Index=StartIndex;
  758. finally
  759. // wait for Group to finish
  760. if Group.FFirstThread<>nil then begin
  761. EnterPoolCriticalSection;
  762. try
  763. Group.FStarterItem.FState:=mtptsInactive;
  764. Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex
  765. // wake threads waiting for starter thread to finish
  766. if Group.FStarterItem.FState<>mtptsInactive then
  767. Group.EnterExceptionState(nil)
  768. else
  769. Group.WakeThreadsWaitingForIndex;
  770. finally
  771. LeavePoolCriticalSection;
  772. end;
  773. // waiting with exponential spin lock
  774. Index:=0;
  775. while Group.FFirstThread<>nil do begin
  776. sleep(Index);
  777. Index:=Index*2+1;
  778. if Index>30 then Index:=30;
  779. end;
  780. end;
  781. // remove group from pool
  782. EnterPoolCriticalSection;
  783. try
  784. case Group.FState of
  785. mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads);
  786. mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing);
  787. end;
  788. finally
  789. LeavePoolCriticalSection;
  790. end;
  791. HelperThreadException:=Group.FException;
  792. Group.Free;
  793. // free terminated threads (terminated, because of exceptions)
  794. CleanTerminatedThreads;
  795. end;
  796. // if the exception occured in a helper thread raise it now
  797. if HelperThreadException<>nil then
  798. raise HelperThreadException;
  799. end;
  800. initialization
  801. ProcThreadPool:=TProcThreadPool.Create;
  802. CurrentThread:=nil;
  803. finalization
  804. ProcThreadPool.Free;
  805. ProcThreadPool:=nil;
  806. end.