| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- /* MtCoder.c -- Multi-thread Coder
- 2015-10-13 : Igor Pavlov : Public domain */
- #include "Precomp.h"
- #include "MtCoder.h"
- void LoopThread_Construct(CLoopThread *p)
- {
- Thread_Construct(&p->thread);
- Event_Construct(&p->startEvent);
- Event_Construct(&p->finishedEvent);
- }
- void LoopThread_Close(CLoopThread *p)
- {
- Thread_Close(&p->thread);
- Event_Close(&p->startEvent);
- Event_Close(&p->finishedEvent);
- }
- static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
- {
- CLoopThread *p = (CLoopThread *)pp;
- for (;;)
- {
- if (Event_Wait(&p->startEvent) != 0)
- return SZ_ERROR_THREAD;
- if (p->stop)
- return 0;
- p->res = p->func(p->param);
- if (Event_Set(&p->finishedEvent) != 0)
- return SZ_ERROR_THREAD;
- }
- }
- WRes LoopThread_Create(CLoopThread *p)
- {
- p->stop = 0;
- RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
- RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
- return Thread_Create(&p->thread, LoopThreadFunc, p);
- }
- WRes LoopThread_StopAndWait(CLoopThread *p)
- {
- p->stop = 1;
- if (Event_Set(&p->startEvent) != 0)
- return SZ_ERROR_THREAD;
- return Thread_Wait(&p->thread);
- }
- WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
- WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
- static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
- {
- return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
- }
- static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
- {
- unsigned i;
- for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
- p->inSizes[i] = p->outSizes[i] = 0;
- p->totalInSize = p->totalOutSize = 0;
- p->progress = progress;
- p->res = SZ_OK;
- }
- static void MtProgress_Reinit(CMtProgress *p, unsigned index)
- {
- p->inSizes[index] = 0;
- p->outSizes[index] = 0;
- }
- #define UPDATE_PROGRESS(size, prev, total) \
- if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
- SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
- {
- SRes res;
- CriticalSection_Enter(&p->cs);
- UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
- UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
- if (p->res == SZ_OK)
- p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
- res = p->res;
- CriticalSection_Leave(&p->cs);
- return res;
- }
- static void MtProgress_SetError(CMtProgress *p, SRes res)
- {
- CriticalSection_Enter(&p->cs);
- if (p->res == SZ_OK)
- p->res = res;
- CriticalSection_Leave(&p->cs);
- }
- static void MtCoder_SetError(CMtCoder* p, SRes res)
- {
- CriticalSection_Enter(&p->cs);
- if (p->res == SZ_OK)
- p->res = res;
- CriticalSection_Leave(&p->cs);
- }
- /* ---------- MtThread ---------- */
- void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
- {
- p->mtCoder = mtCoder;
- p->outBuf = 0;
- p->inBuf = 0;
- Event_Construct(&p->canRead);
- Event_Construct(&p->canWrite);
- LoopThread_Construct(&p->thread);
- }
- #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
- static void CMtThread_CloseEvents(CMtThread *p)
- {
- Event_Close(&p->canRead);
- Event_Close(&p->canWrite);
- }
- static void CMtThread_Destruct(CMtThread *p)
- {
- CMtThread_CloseEvents(p);
- if (Thread_WasCreated(&p->thread.thread))
- {
- LoopThread_StopAndWait(&p->thread);
- LoopThread_Close(&p->thread);
- }
- if (p->mtCoder->alloc)
- IAlloc_Free(p->mtCoder->alloc, p->outBuf);
- p->outBuf = 0;
- if (p->mtCoder->alloc)
- IAlloc_Free(p->mtCoder->alloc, p->inBuf);
- p->inBuf = 0;
- }
- #define MY_BUF_ALLOC(buf, size, newSize) \
- if (buf == 0 || size != newSize) \
- { IAlloc_Free(p->mtCoder->alloc, buf); \
- size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
- if (buf == 0) return SZ_ERROR_MEM; }
- static SRes CMtThread_Prepare(CMtThread *p)
- {
- MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
- MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
- p->stopReading = False;
- p->stopWriting = False;
- RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
- RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
- return SZ_OK;
- }
- static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
- {
- size_t size = *processedSize;
- *processedSize = 0;
- while (size != 0)
- {
- size_t curSize = size;
- SRes res = stream->Read(stream, data, &curSize);
- *processedSize += curSize;
- data += curSize;
- size -= curSize;
- RINOK(res);
- if (curSize == 0)
- return SZ_OK;
- }
- return SZ_OK;
- }
- #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]
- static SRes MtThread_Process(CMtThread *p, Bool *stop)
- {
- CMtThread *next;
- *stop = True;
- if (Event_Wait(&p->canRead) != 0)
- return SZ_ERROR_THREAD;
-
- next = GET_NEXT_THREAD(p);
-
- if (p->stopReading)
- {
- next->stopReading = True;
- return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
- }
- {
- size_t size = p->mtCoder->blockSize;
- size_t destSize = p->outBufSize;
- RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
- next->stopReading = *stop = (size != p->mtCoder->blockSize);
- if (Event_Set(&next->canRead) != 0)
- return SZ_ERROR_THREAD;
- RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
- p->outBuf, &destSize, p->inBuf, size, *stop));
- MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
- if (Event_Wait(&p->canWrite) != 0)
- return SZ_ERROR_THREAD;
- if (p->stopWriting)
- return SZ_ERROR_FAIL;
- if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
- return SZ_ERROR_WRITE;
- return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
- }
- }
- static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
- {
- CMtThread *p = (CMtThread *)pp;
- for (;;)
- {
- Bool stop;
- CMtThread *next = GET_NEXT_THREAD(p);
- SRes res = MtThread_Process(p, &stop);
- if (res != SZ_OK)
- {
- MtCoder_SetError(p->mtCoder, res);
- MtProgress_SetError(&p->mtCoder->mtProgress, res);
- next->stopReading = True;
- next->stopWriting = True;
- Event_Set(&next->canRead);
- Event_Set(&next->canWrite);
- return res;
- }
- if (stop)
- return 0;
- }
- }
- void MtCoder_Construct(CMtCoder* p)
- {
- unsigned i;
- p->alloc = 0;
- for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
- {
- CMtThread *t = &p->threads[i];
- t->index = i;
- CMtThread_Construct(t, p);
- }
- CriticalSection_Init(&p->cs);
- CriticalSection_Init(&p->mtProgress.cs);
- }
- void MtCoder_Destruct(CMtCoder* p)
- {
- unsigned i;
- for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
- CMtThread_Destruct(&p->threads[i]);
- CriticalSection_Delete(&p->cs);
- CriticalSection_Delete(&p->mtProgress.cs);
- }
- SRes MtCoder_Code(CMtCoder *p)
- {
- unsigned i, numThreads = p->numThreads;
- SRes res = SZ_OK;
- p->res = SZ_OK;
- MtProgress_Init(&p->mtProgress, p->progress);
- for (i = 0; i < numThreads; i++)
- {
- RINOK(CMtThread_Prepare(&p->threads[i]));
- }
- for (i = 0; i < numThreads; i++)
- {
- CMtThread *t = &p->threads[i];
- CLoopThread *lt = &t->thread;
- if (!Thread_WasCreated(<->thread))
- {
- lt->func = ThreadFunc;
- lt->param = t;
- if (LoopThread_Create(lt) != SZ_OK)
- {
- res = SZ_ERROR_THREAD;
- break;
- }
- }
- }
- if (res == SZ_OK)
- {
- unsigned j;
- for (i = 0; i < numThreads; i++)
- {
- CMtThread *t = &p->threads[i];
- if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
- {
- res = SZ_ERROR_THREAD;
- p->threads[0].stopReading = True;
- break;
- }
- }
- Event_Set(&p->threads[0].canWrite);
- Event_Set(&p->threads[0].canRead);
- for (j = 0; j < i; j++)
- LoopThread_WaitSubThread(&p->threads[j].thread);
- }
- for (i = 0; i < numThreads; i++)
- CMtThread_CloseEvents(&p->threads[i]);
- return (res == SZ_OK) ? p->res : res;
- }
|