123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- { Copyright (C) <2009> <Andrew Haines> lzxcompressthread.pas
- This library is free software; you can redistribute it and/or modify it
- under the terms of the GNU Library General Public License as published by
- the Free Software Foundation; either version 2 of the License, or (at your
- option) any later version.
- This program is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License
- for more details.
- You should have received a copy of the GNU Library General Public License
- along with this library; if not, write to the Free Software Foundation,
- Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
- }
- {
- See the file COPYING.FPC, included in this distribution,
- for details about the copyright.
- }
- unit lzxcompressthread;
- {$mode objfpc}{$H+}
- interface
- uses
- Classes, paslzxcomp;
- type
- TLZXCompressor = class;
- TLZXMasterThread = class;
- TLZXWorkerThread = class;
- TLZXGetDataMethod = function(Sender: TLZXCompressor; WantedByteCount: Integer; Buffer: Pointer): Integer of object;
- TLZXIsEndOfFileMethod = function(Sender: TLZXCompressor): Boolean of object;
- TLZXChunkDoneMethod = procedure(Sender: TLZXCompressor; CompressedSize: Integer; UncompressedSize: Integer; Buffer: Pointer) of object;
- TLZXMarkFrameMethod = procedure(Sender: TLZXCompressor; CompressedSize: Integer; UncompressedSize: Integer) of object;
- PLZXFinishedBlock = ^TLZXFinishedBlock;
- TLZXFinishedBlock = record
- CompressedSize: Integer;
- UnCompressedSize: Integer;
- Frame1CSize,
- Frame1USize,
- Frame2CSize,
- Frame2USize: Integer;
- Index: Integer;
- Data: Pointer;
- end;
- { TLZXCompressor }
- TLZXCompressor = class(TObject)
- private
- FOnMarkFrame: TLZXMarkFrameMethod;
- FThreadCount: Integer;
- FBlockSize: Integer;
- FTotalCompressedSize: DWord;
- FTotalUnCompressedSize: DWord;
- FOnChunkDone: TLZXChunkDoneMethod;
- FOnGetData: TLZXGetDataMethod;
- FOnIsEndOfFile: TLZXIsEndOfFileMethod;
- FWindowSizeCode: Integer;
- FinishedBlocks: TFPList;
- NextBlockNeeded: Integer;
- FMasterThread: TLZXMasterThread;
- procedure BlockIsFinished(ABlock: PLZXFinishedBlock);
- function GetRunning: Boolean;
- public
- constructor Create(AThreadCount: Integer);
- destructor Destroy; override;
- procedure Execute(WaitForFinish: Boolean = True);
- property BlockSize: Integer read FBlockSize write FBlockSize;
- property Running: Boolean read GetRunning;
- property WindowSizeCode: Integer read FWindowSizeCode write FWindowSizeCode;
- // the following properties must all be assigned
- property OnGetData: TLZXGetDataMethod read FOnGetData write FOnGetData;
- property OnChunkDone: TLZXChunkDoneMethod read FOnChunkDone write FOnChunkDone;
- property OnIsEndOfFile: TLZXIsEndOfFileMethod read FOnIsEndOfFile write FOnIsEndOfFile;
- property OnMarkFrame: TLZXMarkFrameMethod read FOnMarkFrame write FOnMarkFrame;
- end;
- { TLZXMasterThread }
- TLZXMasterThread = class(TThread)
- FCompressor: TLZXCompressor;
- FBusyThreads: TFPList;
- FLockData: TRTLCriticalSection;
- FLockQueueThread: TRTLCriticalSection;
- FDataRemains: Boolean;
- FBlockNumber: Integer;
- FRunning: Boolean;
- FMemList: TFPList;
- // only used inside a critical section!!
- // belongs to a Worker thread which will free it
- FTmpData: Pointer;
- FTmpDataSize: Integer;
- procedure UpdateDataRemains;
- function BlockDone(Worker: TLZXWorkerThread; ABlock: PLZXFinishedBlock): Boolean;
- procedure WorkerFinished(Sender: TObject);
- function GetFreeMemoryChunk: Pointer;
- procedure Lock;
- Procedure UnLock;
- procedure LockTmpData;
- procedure UnLockTmpData;
- function Working: Boolean;
- function DataRemains: Boolean;
- function Running: Boolean;
- function QueueThread(Thread: TLZXWorkerThread): Boolean;
- public
- procedure Execute; override;
- constructor Create(Compressor: TLZXCompressor);
- destructor Destroy; override;
- end;
- { TLZXWorkerThread }
- TLZXWorkerThread = class(TThread)
- private
- Data: PByte;
- DataSize: Integer;
- DataCursor: Integer;
- Frame1C,
- Frame1U,
- Frame2C,
- Frame2U: Integer;
- LZXData: Plzx_data;
- CompressedData: PByte;
- CompressedDataSize: Integer; // compressed written size. not the size of the array
- BlockNumber: Integer;
- WindowSizeCode: Integer;
- BlockSize: Integer;
- MasterThread: TLZXMasterThread;
- ShouldSuspend: Boolean;
- // callbacks for lzxcomp
- function GetBytes(ACount: Longint; ABuffer: Pointer): LongInt; cdecl;
- function WriteBytes(ACount: LongInt; ABuffer: Pointer): LongInt; cdecl;
- procedure MarkFrame(UnCompressedSize: DWord; CompressedSize: DWord); cdecl;
- function IsEndOfFile: LongBool; cdecl;
- // end callbacks
- procedure NotifyMasterDone;
- protected
- procedure Execute; override;
- public
- procedure CompressData(ABlockNumber: Integer);
- constructor Create(AMaster: TLZXMasterThread; AWindowSizeCode: Integer; ABlockSize: Integer);
- destructor Destroy; override;
- end;
- implementation
- uses
- Sysutils; // for Sleep()
- { TLZXCompressor }
- procedure TLZXCompressor.BlockIsFinished(ABlock: PLZXFinishedBlock);
- procedure SendChunk(AChunk: PLZXFinishedBlock);
- begin
- if Assigned(FOnMarkFrame) then
- begin
- Inc(FTotalCompressedSize, AChunk^.Frame1CSize);
- Inc(FTotalUnCompressedSize, AChunk^.Frame1USize);
- Inc(NextBlockNeeded);
- FOnMarkFrame(Self, FTotalCompressedSize, FTotalUnCompressedSize);
- if AChunk^.Frame2CSize > 0 then
- begin
- Inc(FTotalCompressedSize, AChunk^.Frame2CSize);
- Inc(FTotalUnCompressedSize, AChunk^.Frame2USize);
- FOnMarkFrame(Self, FTotalCompressedSize, FTotalUnCompressedSize);
- end;
- end;
- FOnChunkDone(Self, AChunk^.CompressedSize, AChunk^.UnCompressedSize, AChunk^.Data);
- FMasterThread.FMemList.Add(AChunk^.Data);
- Dispose(AChunk);
- end;
- var
- TmpBlock : PLZXFinishedBlock;
- FoundMatch: Boolean;
- i: Integer;
- begin
- if NextBlockNeeded = ABlock^.Index then
- SendChunk(ABlock)
- else
- FinishedBlocks.Add(ABlock);
- repeat
- FoundMatch := False;
- for i := FinishedBlocks.Count-1 downto 0 do
- begin
- TmpBlock := PLZXFinishedBlock(FinishedBlocks.Items[i]);
- if TmpBlock^.Index = NextBlockNeeded then
- begin
- FoundMatch := True;
- SendChunk(TmpBlock);
- FinishedBlocks.Delete(i);
- end;
- end;
- until not FoundMatch;
- end;
- function TLZXCompressor.GetRunning: Boolean;
- begin
- Result := FMasterThread.Running;
- end;
- constructor TLZXCompressor.Create(AThreadCount: Integer);
- begin
- inherited Create;
- FThreadCount := AThreadCount;
- FBlockSize := 1 shl 16 ; // $10000;
- FWindowSizeCode := 16;
- FMasterThread := TLZXMasterThread.Create(Self);
- FinishedBlocks := TFPList.Create;
- end;
- destructor TLZXCompressor.Destroy;
- begin
- FMasterThread.Free;
- FinishedBlocks.Free;
- inherited Destroy;
- end;
- procedure TLZXCompressor.Execute(WaitForFinish: Boolean = True);
- begin
- FTotalCompressedSize := 0;
- FTotalUnCompressedSize := 0;
- FMasterThread.FRunning:=True;
- FMasterThread.Resume;
- if WaitForFinish then
- While Running do
- CheckSynchronize(10);
- end;
- { TLZXMasterThread }
- procedure TLZXMasterThread.UpdateDataRemains;
- begin
- FDataRemains := not FCompressor.FOnIsEndOfFile(FCompressor);
- end;
- function TLZXMasterThread.BlockDone(Worker: TLZXWorkerThread; ABlock: PLZXFinishedBlock): Boolean;
- begin
- Lock;
- REsult := True;
- FCompressor.BlockIsFinished(ABlock);
- if DataRemains then
- QueueThread(Worker)
- else
- begin
- Result := False;
- FBusyThreads.Remove(Worker);
- Worker.Terminate;
- if FBusyThreads.Count = 0 then
- Resume;
- end;
- Unlock;
- end;
- procedure TLZXMasterThread.WorkerFinished(Sender: TObject);
- begin
- FBusyThreads.Remove(Sender);
- if TThread(Sender).FatalException <> nil then
- Raise Exception(TThread(Sender).FatalException);
- Sender.Free;
- end;
- function TLZXMasterThread.GetFreeMemoryChunk: Pointer;
- begin
- if FMemList.Count >0 then
- begin
- Result := FMemList.Items[0];
- FMemList.Delete(0);
- end
- else
- Result := Getmem(FCompressor.BlockSize);
- end;
- procedure TLZXMasterThread.Lock;
- begin
- EnterCriticalsection(FLockData);
- end;
- procedure TLZXMasterThread.UnLock;
- begin
- LeaveCriticalsection(FLockData);
- end;
- procedure TLZXMasterThread.LockTmpData;
- begin
- EnterCriticalsection(FLockQueueThread);
- end;
- procedure TLZXMasterThread.UnLockTmpData;
- begin
- LeaveCriticalsection(FLockQueueThread);
- end;
- function TLZXMasterThread.Working: Boolean;
- begin
- Result := FBusyThreads.Count > 0;
- end;
- function TLZXMasterThread.DataRemains: Boolean;
- begin
- UpdateDataRemains;
- Result := FDataRemains;
- end;
- function TLZXMasterThread.Running: Boolean;
- begin
- REsult := FRunning;
- end;
- function TLZXMasterThread.QueueThread(Thread: TLZXWorkerThread): Boolean;
- begin
- LockTmpData;
- Result := DataRemains;
- if Not Result then
- begin
- UnLockTmpData;
- Exit;
- end;
- FDataRemains := False;
- Thread.DataSize := FCompressor.OnGetData(FCompressor, FCompressor.FBlockSize, Thread.Data);
- Thread.CompressData(FBlockNumber);
- Inc(FBlockNumber);
- if Thread.Suspended then
- Thread.Resume;
- UnLockTmpData;
- end;
- procedure TLZXMasterThread.Execute;
- var
- i: Integer;
- Thread: TLZXWorkerThread;
- begin
- FRunning:= True;
- for i := 0 to FCompressor.FThreadCount-1 do
- begin
- Thread := TLZXWorkerThread.Create(Self, FCompressor.WindowSizeCode, FCompressor.BlockSize);
- Thread.FreeOnTerminate := True;
- Thread.OnTerminate:= @WorkerFinished;
- if QueueThread(Thread) then
- FBusyThreads.Add(Thread);
- end;
- //Suspend;
- while Working do
- begin
- Sleep(0);
- end;
- FRunning:= False;
- end;
- constructor TLZXMasterThread.Create(Compressor: TLZXCompressor);
- begin
- Inherited Create(True);
- FCompressor := Compressor;
- FDataRemains := True;
- FBusyThreads := TFPList.Create;
- FMemList := TFPList.Create;
- InitCriticalSection(FLockData);
- InitCriticalSection(FLockQueueThread);
- end;
- destructor TLZXMasterThread.Destroy;
- var
- i: Integer;
- begin
- DoneCriticalsection(FLockData);
- DoneCriticalsection(FLockQueueThread);
- for i := 0 to FBusyThreads.Count-1 do TObject(FBusyThreads.Items[i]).Free;
- for i := 0 to FMemList.Count-1 do Freemem(FMemList.Items[i]);
- FBusyThreads.Free;
- FMemList.Free;
- inherited Destroy;
- end;
- { TLZXWorkerThread }
- function TLZXWorkerThread.GetBytes(ACount: Longint; ABuffer: Pointer): LongInt; cdecl;
- var
- MaxBytes: Integer;
- begin
- MaxBytes := DataSize - DataCursor;
- if ACount > MaxBytes then
- ACount := MaxBytes;
- Move(Data[DataCursor], ABuffer^, ACount);
- Inc(DataCursor, ACount);
- Result := ACount;
- end;
- function TLZXWorkerThread.WriteBytes(ACount: LongInt; ABuffer: Pointer
- ): LongInt; cdecl;
- begin
- Move(ABuffer^, CompressedData[CompressedDataSize], ACount);
- Inc(CompressedDataSize, ACount);
- Result := ACount;
- end;
- procedure TLZXWorkerThread.MarkFrame(UnCompressedSize: DWord;
- CompressedSize: DWord); cdecl;
- begin
- if Frame1C = 0 then
- begin
- Frame1C := CompressedSize;
- Frame1U := UnCompressedSize;
- end
- else
- begin
- Frame2C := CompressedSize;
- Frame2U := UnCompressedSize;
- end;
- end;
- function TLZXWorkerThread.IsEndOfFile: LongBool; cdecl;
- begin
- Result := LongBool(DataSize - DataCursor <= 0);
- end;
- procedure TLZXWorkerThread.NotifyMasterDone;
- var
- Block: PLZXFinishedBlock;
- begin
- LZXData^.len_compressed_output:=0;
- LZXData^.len_uncompressed_input:=0;
- New(Block);
- Block^.Data := MasterThread.GetFreeMemoryChunk;
- Move(CompressedData^, Block^.Data^, CompressedDataSize);
- Block^.CompressedSize := CompressedDataSize;
- Block^.UnCompressedSize:= DataSize;
- Block^.Index := BlockNumber;
- Block^.Frame1CSize := Frame1C;
- Block^.Frame2CSize := Frame2C-Frame1C;
- Block^.Frame1USize := Frame1U;
- Block^.Frame2USize := Frame2U-Frame1U;
- Frame1C := 0;
- Frame2C := 0;
- Frame1U := 0;
- Frame2U := 0;
- ShouldSuspend := not MasterThread.BlockDone(Self, Block);
- end;
- procedure TLZXWorkerThread.CompressData(ABlockNumber: Integer);
- begin
- BlockNumber := ABlockNumber;
- DataCursor := 0;
- CompressedDataSize := 0;
- end;
- procedure TLZXWorkerThread.Execute;
- var
- WSize: LongInt;
- begin
- WSize := 1 shl WindowSizeCode;
- while not Terminated do
- begin
- lzx_reset(LZXdata);
- lzx_compress_block(LZXdata, WSize, True);
- MasterThread.Synchronize(@NotifyMasterDone);
- if ShouldSuspend then
- begin
- Suspend;
- end;
- end;
- end;
- constructor TLZXWorkerThread.Create(AMaster: TLZXMasterThread;
- AWindowSizeCode: Integer; ABlockSize: Integer);
- begin
- Inherited Create(True);
- MasterThread := AMaster;
- WindowSizeCode := AWindowSizeCode;
- BlockSize := ABlockSize;
- FreeOnTerminate := True;
- Data := GetMem(ABlockSize);
- CompressedData:=GetMem(ABlockSize);
- lzx_init(@LZXdata, longint(WindowSizeCode),
- TGetBytesFunc(@TLZXWorkerThread.GetBytes), Self,
- TIsEndOfFileFunc(@TLZXWorkerThread.IsEndOfFile),
- TWriteBytesFunc(@TLZXWorkerThread.WriteBytes), Self,
- TMarkFrameFunc(@TLZXWorkerThread.MarkFrame), Self);
- end;
- destructor TLZXWorkerThread.Destroy;
- begin
- lzx_finish(LZXdata, nil);
- Freemem(Data);
- FreeMem(CompressedData);
- inherited Destroy;
- end;
- end.
|