mtputils.pas 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. {
  2. **********************************************************************
  3. This file is part of the Free Pascal run time library.
  4. See the file COPYING.FPC, included in this distribution,
  5. for details about the license.
  6. **********************************************************************
  7. Utilities using light weight threads.
  8. Copyright (C) 2008 Mattias Gaertner [email protected]
  9. Abstract:
  10. Utility functions using mtprocs.
  11. For example a parallel sort.
  12. }
  13. unit MTPUtils;
  14. {$mode objfpc}{$H+}
  15. interface
  16. uses
  17. Classes, SysUtils, MTProcs;
  18. type
  19. TSortPartEvent = procedure(aList: PPointer; aCount: PtrInt);
  20. { TParallelSortPointerList }
  21. TParallelSortPointerList = class
  22. protected
  23. fBlockSize: PtrInt;
  24. fBlockCntPowOf2Offset: PtrInt;
  25. FMergeBuffer: PPointer;
  26. procedure MTPSort(Index: PtrInt; {%H-}Data: Pointer; Item: TMultiThreadProcItem);
  27. public
  28. List: PPointer;
  29. Count: PtrInt;
  30. Compare: TListSortCompare;
  31. BlockCnt: PtrInt;
  32. OnSortPart: TSortPartEvent;
  33. constructor Create(aList: PPointer; aCount: PtrInt; const aCompare: TListSortCompare;
  34. MaxThreadCount: integer = 0);
  35. procedure Sort;
  36. end;
  37. { Sort a list in parallel using merge sort.
  38. You must provide a compare function.
  39. You can provide your own sort function for the blocks which are sorted in a
  40. single thread, for example a normal quicksort. }
  41. procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
  42. MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
  43. implementation
  44. procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
  45. MaxThreadCount: integer; const OnSortPart: TSortPartEvent);
  46. var
  47. Sorter: TParallelSortPointerList;
  48. begin
  49. if List.Count<=1 then exit;
  50. Sorter:=TParallelSortPointerList.Create(@List.List[0],List.Count,Compare,
  51. MaxThreadCount);
  52. try
  53. Sorter.OnSortPart:=OnSortPart;
  54. Sorter.Sort;
  55. finally
  56. Sorter.Free;
  57. end;
  58. end;
  59. { TParallelSortPointerList }
  60. procedure TParallelSortPointerList.MTPSort(Index: PtrInt; Data: Pointer;
  61. Item: TMultiThreadProcItem);
  62. procedure MergeSort(L, M, R: PtrInt; Recursive: boolean);
  63. var
  64. Src1: PtrInt;
  65. Src2: PtrInt;
  66. Dest1: PtrInt;
  67. begin
  68. if R-L<=1 then begin
  69. // sort lists of 1 and 2 items directly
  70. if L<R then begin
  71. if Compare(List[L],List[R])>0 then begin
  72. FMergeBuffer[L]:=List[L];
  73. List[L]:=List[R];
  74. List[R]:=FMergeBuffer[L];
  75. end;
  76. end;
  77. exit;
  78. end;
  79. // sort recursively
  80. if Recursive then begin
  81. MergeSort(L,(L+M) div 2,M-1,true);
  82. MergeSort(M,(M+R+1) div 2,R,true);
  83. end;
  84. // merge both blocks
  85. Src1:=L;
  86. Src2:=M;
  87. Dest1:=L;
  88. repeat
  89. if (Src1<M)
  90. and ((Src2>R) or (Compare(List[Src1],List[Src2])<=0)) then begin
  91. FMergeBuffer[Dest1]:=List[Src1];
  92. inc(Dest1);
  93. inc(Src1);
  94. end else if (Src2<=R) then begin
  95. FMergeBuffer[Dest1]:=List[Src2];
  96. inc(Dest1);
  97. inc(Src2);
  98. end else
  99. break;
  100. until false;
  101. // write the mergebuffer back
  102. Src1:=L;
  103. Dest1:=l;
  104. while Src1<=R do begin
  105. List[Dest1]:=FMergeBuffer[Src1];
  106. inc(Src1);
  107. inc(Dest1);
  108. end;
  109. end;
  110. var
  111. L, M, R: PtrInt;
  112. i: integer;
  113. NormIndex: Integer;
  114. Range: integer;
  115. MergeIndex: Integer;
  116. begin
  117. L:=fBlockSize*Index;
  118. R:=L+fBlockSize-1;
  119. if R>=Count then
  120. R:=Count-1; // last block
  121. //WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' sort block: ',L,' ',(L+R+1) div 2,' ',R);
  122. if Assigned(OnSortPart) then
  123. OnSortPart(@List[L],R-L+1)
  124. else
  125. MergeSort(L,(L+R+1) div 2,R,true);
  126. // merge
  127. // 0 1 2 3 4 5 6 7
  128. // \/ \/ \/ \/
  129. // \/ \/
  130. // \/
  131. // For example: BlockCnt = 5 => Index in 0..4
  132. // fBlockCntPowOf2Offset = 3 (=8-5)
  133. // NormIndex = Index + 3 => NormIndex in 3..7
  134. NormIndex:=Index+fBlockCntPowOf2Offset;
  135. i:=0;
  136. repeat
  137. Range:=1 shl i;
  138. if NormIndex and Range=0 then break;
  139. // merge left and right block(s)
  140. MergeIndex:=NormIndex-Range-fBlockCntPowOf2Offset;
  141. if (MergeIndex+Range-1>=0) then begin
  142. // wait until left blocks have finished
  143. //WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' wait for block ',MergeIndex);
  144. if (MergeIndex>=0) and (not Item.WaitForIndex(MergeIndex)) then exit;
  145. // compute left and right block bounds
  146. M:=L;
  147. L:=(MergeIndex-Range+1)*fBlockSize;
  148. if L<0 then L:=0;
  149. //WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' merge blocks ',L,' ',M,' ',R);
  150. MergeSort(L,M,R,false);
  151. end;
  152. inc(i);
  153. until false;
  154. //WriteLn('TParallelSortPointerList.LWTSort END Index='+IntToStr(Index));
  155. end;
  156. constructor TParallelSortPointerList.Create(aList: PPointer; aCount: PtrInt;
  157. const aCompare: TListSortCompare; MaxThreadCount: integer);
  158. begin
  159. List:=aList;
  160. Count:=aCount;
  161. Compare:=aCompare;
  162. BlockCnt:=Count div 100; // at least 100 items per thread
  163. if BlockCnt>ProcThreadPool.MaxThreadCount then
  164. BlockCnt:=ProcThreadPool.MaxThreadCount;
  165. if (MaxThreadCount>0) and (BlockCnt>MaxThreadCount) then
  166. BlockCnt:=MaxThreadCount;
  167. if BlockCnt<1 then BlockCnt:=1;
  168. end;
  169. procedure TParallelSortPointerList.Sort;
  170. begin
  171. if (Count<=1) then exit;
  172. fBlockSize:=(Count+BlockCnt-1) div BlockCnt;
  173. fBlockCntPowOf2Offset:=1;
  174. while fBlockCntPowOf2Offset<BlockCnt do
  175. fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset*2;
  176. fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset-BlockCnt;
  177. //WriteLn('TParallelSortPointerList.Sort BlockCnt=',BlockCnt,' fBlockSize=',fBlockSize,' fBlockCntPowOf2Offset=',fBlockCntPowOf2Offset);
  178. GetMem(FMergeBuffer,SizeOf(Pointer)*Count);
  179. try
  180. ProcThreadPool.DoParallel(@MTPSort,0,BlockCnt-1);
  181. finally
  182. FreeMem(FMergeBuffer);
  183. FMergeBuffer:=nil;
  184. end;
  185. end;
  186. end.