lzxcompressthread.pas 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. { Copyright (C) <2009> <Andrew Haines> lzxcompressthread.pas
  2. This library is free software; you can redistribute it and/or modify it
  3. under the terms of the GNU Library General Public License as published by
  4. the Free Software Foundation; either version 2 of the License, or (at your
  5. option) any later version.
  6. This program is distributed in the hope that it will be useful, but WITHOUT
  7. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  8. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License
  9. for more details.
  10. You should have received a copy of the GNU Library General Public License
  11. along with this library; if not, write to the Free Software Foundation,
  12. Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  13. }
  14. {
  15. See the file COPYING.FPC, included in this distribution,
  16. for details about the copyright.
  17. }
  18. unit lzxcompressthread;
  19. {$mode objfpc}{$H+}
  20. interface
  21. uses
  22. Classes, paslzxcomp;
  23. type
  24. TLZXCompressor = class;
  25. TLZXMasterThread = class;
  26. TLZXWorkerThread = class;
  27. TLZXGetDataMethod = function(Sender: TLZXCompressor; WantedByteCount: Integer; Buffer: Pointer): Integer of object;
  28. TLZXIsEndOfFileMethod = function(Sender: TLZXCompressor): Boolean of object;
  29. TLZXChunkDoneMethod = procedure(Sender: TLZXCompressor; CompressedSize: Integer; UncompressedSize: Integer; Buffer: Pointer) of object;
  30. TLZXMarkFrameMethod = procedure(Sender: TLZXCompressor; CompressedSize: Integer; UncompressedSize: Integer) of object;
  31. PLZXFinishedBlock = ^TLZXFinishedBlock;
  32. TLZXFinishedBlock = record
  33. CompressedSize: Integer;
  34. UnCompressedSize: Integer;
  35. Frame1CSize,
  36. Frame1USize,
  37. Frame2CSize,
  38. Frame2USize: Integer;
  39. Index: Integer;
  40. Data: Pointer;
  41. end;
  42. { TLZXCompressor }
  43. TLZXCompressor = class(TObject)
  44. private
  45. FOnMarkFrame: TLZXMarkFrameMethod;
  46. FThreadCount: Integer;
  47. FBlockSize: Integer;
  48. FTotalCompressedSize: DWord;
  49. FTotalUnCompressedSize: DWord;
  50. FOnChunkDone: TLZXChunkDoneMethod;
  51. FOnGetData: TLZXGetDataMethod;
  52. FOnIsEndOfFile: TLZXIsEndOfFileMethod;
  53. FWindowSizeCode: Integer;
  54. FinishedBlocks: TFPList;
  55. NextBlockNeeded: Integer;
  56. FMasterThread: TLZXMasterThread;
  57. procedure BlockIsFinished(ABlock: PLZXFinishedBlock);
  58. function GetRunning: Boolean;
  59. public
  60. constructor Create(AThreadCount: Integer);
  61. destructor Destroy; override;
  62. procedure Execute(WaitForFinish: Boolean = True);
  63. property BlockSize: Integer read FBlockSize write FBlockSize;
  64. property Running: Boolean read GetRunning;
  65. property WindowSizeCode: Integer read FWindowSizeCode write FWindowSizeCode;
  66. // the following properties must all be assigned
  67. property OnGetData: TLZXGetDataMethod read FOnGetData write FOnGetData;
  68. property OnChunkDone: TLZXChunkDoneMethod read FOnChunkDone write FOnChunkDone;
  69. property OnIsEndOfFile: TLZXIsEndOfFileMethod read FOnIsEndOfFile write FOnIsEndOfFile;
  70. property OnMarkFrame: TLZXMarkFrameMethod read FOnMarkFrame write FOnMarkFrame;
  71. end;
  72. { TLZXMasterThread }
  73. TLZXMasterThread = class(TThread)
  74. FCompressor: TLZXCompressor;
  75. FBusyThreads: TFPList;
  76. FLockData: TRTLCriticalSection;
  77. FLockQueueThread: TRTLCriticalSection;
  78. FDataRemains: Boolean;
  79. FBlockNumber: Integer;
  80. FRunning: Boolean;
  81. FMemList: TFPList;
  82. // only used inside a critical section!!
  83. // belongs to a Worker thread which will free it
  84. FTmpData: Pointer;
  85. FTmpDataSize: Integer;
  86. procedure UpdateDataRemains;
  87. function BlockDone(Worker: TLZXWorkerThread; ABlock: PLZXFinishedBlock): Boolean;
  88. procedure WorkerFinished(Sender: TObject);
  89. function GetFreeMemoryChunk: Pointer;
  90. procedure Lock;
  91. Procedure UnLock;
  92. procedure LockTmpData;
  93. procedure UnLockTmpData;
  94. function Working: Boolean;
  95. function DataRemains: Boolean;
  96. function Running: Boolean;
  97. function QueueThread(Thread: TLZXWorkerThread): Boolean;
  98. public
  99. procedure Execute; override;
  100. constructor Create(Compressor: TLZXCompressor);
  101. destructor Destroy; override;
  102. end;
  103. { TLZXWorkerThread }
  104. TLZXWorkerThread = class(TThread)
  105. private
  106. Data: PByte;
  107. DataSize: Integer;
  108. DataCursor: Integer;
  109. Frame1C,
  110. Frame1U,
  111. Frame2C,
  112. Frame2U: Integer;
  113. LZXData: Plzx_data;
  114. CompressedData: PByte;
  115. CompressedDataSize: Integer; // compressed written size. not the size of the array
  116. BlockNumber: Integer;
  117. WindowSizeCode: Integer;
  118. BlockSize: Integer;
  119. MasterThread: TLZXMasterThread;
  120. ShouldSuspend: Boolean;
  121. // callbacks for lzxcomp
  122. function GetBytes(ACount: Longint; ABuffer: Pointer): LongInt; cdecl;
  123. function WriteBytes(ACount: LongInt; ABuffer: Pointer): LongInt; cdecl;
  124. procedure MarkFrame(UnCompressedSize: DWord; CompressedSize: DWord); cdecl;
  125. function IsEndOfFile: LongBool; cdecl;
  126. // end callbacks
  127. procedure NotifyMasterDone;
  128. protected
  129. procedure Execute; override;
  130. public
  131. procedure CompressData(ABlockNumber: Integer);
  132. constructor Create(AMaster: TLZXMasterThread; AWindowSizeCode: Integer; ABlockSize: Integer);
  133. destructor Destroy; override;
  134. end;
  135. implementation
  136. uses
  137. Sysutils; // for Sleep()
  138. { TLZXCompressor }
  139. procedure TLZXCompressor.BlockIsFinished(ABlock: PLZXFinishedBlock);
  140. procedure SendChunk(AChunk: PLZXFinishedBlock);
  141. begin
  142. if Assigned(FOnMarkFrame) then
  143. begin
  144. Inc(FTotalCompressedSize, AChunk^.Frame1CSize);
  145. Inc(FTotalUnCompressedSize, AChunk^.Frame1USize);
  146. Inc(NextBlockNeeded);
  147. FOnMarkFrame(Self, FTotalCompressedSize, FTotalUnCompressedSize);
  148. if AChunk^.Frame2CSize > 0 then
  149. begin
  150. Inc(FTotalCompressedSize, AChunk^.Frame2CSize);
  151. Inc(FTotalUnCompressedSize, AChunk^.Frame2USize);
  152. FOnMarkFrame(Self, FTotalCompressedSize, FTotalUnCompressedSize);
  153. end;
  154. end;
  155. FOnChunkDone(Self, AChunk^.CompressedSize, AChunk^.UnCompressedSize, AChunk^.Data);
  156. FMasterThread.FMemList.Add(AChunk^.Data);
  157. Dispose(AChunk);
  158. end;
  159. var
  160. TmpBlock : PLZXFinishedBlock;
  161. FoundMatch: Boolean;
  162. i: Integer;
  163. begin
  164. if NextBlockNeeded = ABlock^.Index then
  165. SendChunk(ABlock)
  166. else
  167. FinishedBlocks.Add(ABlock);
  168. repeat
  169. FoundMatch := False;
  170. for i := FinishedBlocks.Count-1 downto 0 do
  171. begin
  172. TmpBlock := PLZXFinishedBlock(FinishedBlocks.Items[i]);
  173. if TmpBlock^.Index = NextBlockNeeded then
  174. begin
  175. FoundMatch := True;
  176. SendChunk(TmpBlock);
  177. FinishedBlocks.Delete(i);
  178. end;
  179. end;
  180. until not FoundMatch;
  181. end;
  182. function TLZXCompressor.GetRunning: Boolean;
  183. begin
  184. Result := FMasterThread.Running;
  185. end;
  186. constructor TLZXCompressor.Create(AThreadCount: Integer);
  187. begin
  188. inherited Create;
  189. FThreadCount := AThreadCount;
  190. FBlockSize := 1 shl 16 ; // $10000;
  191. FWindowSizeCode := 16;
  192. FMasterThread := TLZXMasterThread.Create(Self);
  193. FinishedBlocks := TFPList.Create;
  194. end;
  195. destructor TLZXCompressor.Destroy;
  196. begin
  197. FMasterThread.Free;
  198. FinishedBlocks.Free;
  199. inherited Destroy;
  200. end;
  201. procedure TLZXCompressor.Execute(WaitForFinish: Boolean = True);
  202. begin
  203. FTotalCompressedSize := 0;
  204. FTotalUnCompressedSize := 0;
  205. FMasterThread.FRunning:=True;
  206. FMasterThread.Resume;
  207. if WaitForFinish then
  208. While Running do
  209. CheckSynchronize(10);
  210. end;
  211. { TLZXMasterThread }
  212. procedure TLZXMasterThread.UpdateDataRemains;
  213. begin
  214. FDataRemains := not FCompressor.FOnIsEndOfFile(FCompressor);
  215. end;
  216. function TLZXMasterThread.BlockDone(Worker: TLZXWorkerThread; ABlock: PLZXFinishedBlock): Boolean;
  217. begin
  218. Lock;
  219. REsult := True;
  220. FCompressor.BlockIsFinished(ABlock);
  221. if DataRemains then
  222. QueueThread(Worker)
  223. else
  224. begin
  225. Result := False;
  226. FBusyThreads.Remove(Worker);
  227. Worker.Terminate;
  228. if FBusyThreads.Count = 0 then
  229. Resume;
  230. end;
  231. Unlock;
  232. end;
  233. procedure TLZXMasterThread.WorkerFinished(Sender: TObject);
  234. begin
  235. FBusyThreads.Remove(Sender);
  236. if TThread(Sender).FatalException <> nil then
  237. Raise Exception(TThread(Sender).FatalException);
  238. Sender.Free;
  239. end;
  240. function TLZXMasterThread.GetFreeMemoryChunk: Pointer;
  241. begin
  242. if FMemList.Count >0 then
  243. begin
  244. Result := FMemList.Items[0];
  245. FMemList.Delete(0);
  246. end
  247. else
  248. Result := Getmem(FCompressor.BlockSize);
  249. end;
  250. procedure TLZXMasterThread.Lock;
  251. begin
  252. EnterCriticalsection(FLockData);
  253. end;
  254. procedure TLZXMasterThread.UnLock;
  255. begin
  256. LeaveCriticalsection(FLockData);
  257. end;
  258. procedure TLZXMasterThread.LockTmpData;
  259. begin
  260. EnterCriticalsection(FLockQueueThread);
  261. end;
  262. procedure TLZXMasterThread.UnLockTmpData;
  263. begin
  264. LeaveCriticalsection(FLockQueueThread);
  265. end;
  266. function TLZXMasterThread.Working: Boolean;
  267. begin
  268. Result := FBusyThreads.Count > 0;
  269. end;
  270. function TLZXMasterThread.DataRemains: Boolean;
  271. begin
  272. UpdateDataRemains;
  273. Result := FDataRemains;
  274. end;
  275. function TLZXMasterThread.Running: Boolean;
  276. begin
  277. REsult := FRunning;
  278. end;
  279. function TLZXMasterThread.QueueThread(Thread: TLZXWorkerThread): Boolean;
  280. begin
  281. LockTmpData;
  282. Result := DataRemains;
  283. if Not Result then
  284. begin
  285. UnLockTmpData;
  286. Exit;
  287. end;
  288. FDataRemains := False;
  289. Thread.DataSize := FCompressor.OnGetData(FCompressor, FCompressor.FBlockSize, Thread.Data);
  290. Thread.CompressData(FBlockNumber);
  291. Inc(FBlockNumber);
  292. if Thread.Suspended then
  293. Thread.Resume;
  294. UnLockTmpData;
  295. end;
  296. procedure TLZXMasterThread.Execute;
  297. var
  298. i: Integer;
  299. Thread: TLZXWorkerThread;
  300. begin
  301. FRunning:= True;
  302. for i := 0 to FCompressor.FThreadCount-1 do
  303. begin
  304. Thread := TLZXWorkerThread.Create(Self, FCompressor.WindowSizeCode, FCompressor.BlockSize);
  305. Thread.FreeOnTerminate := True;
  306. Thread.OnTerminate:= @WorkerFinished;
  307. if QueueThread(Thread) then
  308. FBusyThreads.Add(Thread);
  309. end;
  310. //Suspend;
  311. while Working do
  312. begin
  313. Sleep(0);
  314. end;
  315. FRunning:= False;
  316. end;
  317. constructor TLZXMasterThread.Create(Compressor: TLZXCompressor);
  318. begin
  319. Inherited Create(True);
  320. FCompressor := Compressor;
  321. FDataRemains := True;
  322. FBusyThreads := TFPList.Create;
  323. FMemList := TFPList.Create;
  324. InitCriticalSection(FLockData);
  325. InitCriticalSection(FLockQueueThread);
  326. end;
  327. destructor TLZXMasterThread.Destroy;
  328. var
  329. i: Integer;
  330. begin
  331. DoneCriticalsection(FLockData);
  332. DoneCriticalsection(FLockQueueThread);
  333. for i := 0 to FBusyThreads.Count-1 do TObject(FBusyThreads.Items[i]).Free;
  334. for i := 0 to FMemList.Count-1 do Freemem(FMemList.Items[i]);
  335. FBusyThreads.Free;
  336. FMemList.Free;
  337. inherited Destroy;
  338. end;
  339. { TLZXWorkerThread }
  340. function TLZXWorkerThread.GetBytes(ACount: Longint; ABuffer: Pointer): LongInt; cdecl;
  341. var
  342. MaxBytes: Integer;
  343. begin
  344. MaxBytes := DataSize - DataCursor;
  345. if ACount > MaxBytes then
  346. ACount := MaxBytes;
  347. Move(Data[DataCursor], ABuffer^, ACount);
  348. Inc(DataCursor, ACount);
  349. Result := ACount;
  350. end;
  351. function TLZXWorkerThread.WriteBytes(ACount: LongInt; ABuffer: Pointer
  352. ): LongInt; cdecl;
  353. begin
  354. Move(ABuffer^, CompressedData[CompressedDataSize], ACount);
  355. Inc(CompressedDataSize, ACount);
  356. Result := ACount;
  357. end;
  358. procedure TLZXWorkerThread.MarkFrame(UnCompressedSize: DWord;
  359. CompressedSize: DWord); cdecl;
  360. begin
  361. if Frame1C = 0 then
  362. begin
  363. Frame1C := CompressedSize;
  364. Frame1U := UnCompressedSize;
  365. end
  366. else
  367. begin
  368. Frame2C := CompressedSize;
  369. Frame2U := UnCompressedSize;
  370. end;
  371. end;
  372. function TLZXWorkerThread.IsEndOfFile: LongBool; cdecl;
  373. begin
  374. Result := LongBool(DataSize - DataCursor <= 0);
  375. end;
  376. procedure TLZXWorkerThread.NotifyMasterDone;
  377. var
  378. Block: PLZXFinishedBlock;
  379. begin
  380. LZXData^.len_compressed_output:=0;
  381. LZXData^.len_uncompressed_input:=0;
  382. New(Block);
  383. Block^.Data := MasterThread.GetFreeMemoryChunk;
  384. Move(CompressedData^, Block^.Data^, CompressedDataSize);
  385. Block^.CompressedSize := CompressedDataSize;
  386. Block^.UnCompressedSize:= DataSize;
  387. Block^.Index := BlockNumber;
  388. Block^.Frame1CSize := Frame1C;
  389. Block^.Frame2CSize := Frame2C-Frame1C;
  390. Block^.Frame1USize := Frame1U;
  391. Block^.Frame2USize := Frame2U-Frame1U;
  392. Frame1C := 0;
  393. Frame2C := 0;
  394. Frame1U := 0;
  395. Frame2U := 0;
  396. ShouldSuspend := not MasterThread.BlockDone(Self, Block);
  397. end;
  398. procedure TLZXWorkerThread.CompressData(ABlockNumber: Integer);
  399. begin
  400. BlockNumber := ABlockNumber;
  401. DataCursor := 0;
  402. CompressedDataSize := 0;
  403. end;
  404. procedure TLZXWorkerThread.Execute;
  405. var
  406. WSize: LongInt;
  407. begin
  408. WSize := 1 shl WindowSizeCode;
  409. while not Terminated do
  410. begin
  411. lzx_reset(LZXdata);
  412. lzx_compress_block(LZXdata, WSize, True);
  413. MasterThread.Synchronize(@NotifyMasterDone);
  414. if ShouldSuspend then
  415. begin
  416. Suspend;
  417. end;
  418. end;
  419. end;
  420. constructor TLZXWorkerThread.Create(AMaster: TLZXMasterThread;
  421. AWindowSizeCode: Integer; ABlockSize: Integer);
  422. begin
  423. Inherited Create(True);
  424. MasterThread := AMaster;
  425. WindowSizeCode := AWindowSizeCode;
  426. BlockSize := ABlockSize;
  427. FreeOnTerminate := True;
  428. Data := GetMem(ABlockSize);
  429. CompressedData:=GetMem(ABlockSize);
  430. lzx_init(@LZXdata, longint(WindowSizeCode),
  431. TGetBytesFunc(@TLZXWorkerThread.GetBytes), Self,
  432. TIsEndOfFileFunc(@TLZXWorkerThread.IsEndOfFile),
  433. TWriteBytesFunc(@TLZXWorkerThread.WriteBytes), Self,
  434. TMarkFrameFunc(@TLZXWorkerThread.MarkFrame), Self);
  435. end;
  436. destructor TLZXWorkerThread.Destroy;
  437. begin
  438. lzx_finish(LZXdata, nil);
  439. Freemem(Data);
  440. FreeMem(CompressedData);
  441. inherited Destroy;
  442. end;
  443. end.