MtCoder.c 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /* MtCoder.c -- Multi-thread Coder
  2. 2015-10-13 : Igor Pavlov : Public domain */
  3. #include "Precomp.h"
  4. #include "MtCoder.h"
  5. void LoopThread_Construct(CLoopThread *p)
  6. {
  7. Thread_Construct(&p->thread);
  8. Event_Construct(&p->startEvent);
  9. Event_Construct(&p->finishedEvent);
  10. }
  11. void LoopThread_Close(CLoopThread *p)
  12. {
  13. Thread_Close(&p->thread);
  14. Event_Close(&p->startEvent);
  15. Event_Close(&p->finishedEvent);
  16. }
  17. static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
  18. {
  19. CLoopThread *p = (CLoopThread *)pp;
  20. for (;;)
  21. {
  22. if (Event_Wait(&p->startEvent) != 0)
  23. return SZ_ERROR_THREAD;
  24. if (p->stop)
  25. return 0;
  26. p->res = p->func(p->param);
  27. if (Event_Set(&p->finishedEvent) != 0)
  28. return SZ_ERROR_THREAD;
  29. }
  30. }
  31. WRes LoopThread_Create(CLoopThread *p)
  32. {
  33. p->stop = 0;
  34. RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
  35. RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
  36. return Thread_Create(&p->thread, LoopThreadFunc, p);
  37. }
  38. WRes LoopThread_StopAndWait(CLoopThread *p)
  39. {
  40. p->stop = 1;
  41. if (Event_Set(&p->startEvent) != 0)
  42. return SZ_ERROR_THREAD;
  43. return Thread_Wait(&p->thread);
  44. }
  45. WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
  46. WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
  47. static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
  48. {
  49. return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
  50. }
  51. static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
  52. {
  53. unsigned i;
  54. for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
  55. p->inSizes[i] = p->outSizes[i] = 0;
  56. p->totalInSize = p->totalOutSize = 0;
  57. p->progress = progress;
  58. p->res = SZ_OK;
  59. }
  60. static void MtProgress_Reinit(CMtProgress *p, unsigned index)
  61. {
  62. p->inSizes[index] = 0;
  63. p->outSizes[index] = 0;
  64. }
  65. #define UPDATE_PROGRESS(size, prev, total) \
  66. if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
  67. SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
  68. {
  69. SRes res;
  70. CriticalSection_Enter(&p->cs);
  71. UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
  72. UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
  73. if (p->res == SZ_OK)
  74. p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
  75. res = p->res;
  76. CriticalSection_Leave(&p->cs);
  77. return res;
  78. }
  79. static void MtProgress_SetError(CMtProgress *p, SRes res)
  80. {
  81. CriticalSection_Enter(&p->cs);
  82. if (p->res == SZ_OK)
  83. p->res = res;
  84. CriticalSection_Leave(&p->cs);
  85. }
  86. static void MtCoder_SetError(CMtCoder* p, SRes res)
  87. {
  88. CriticalSection_Enter(&p->cs);
  89. if (p->res == SZ_OK)
  90. p->res = res;
  91. CriticalSection_Leave(&p->cs);
  92. }
  93. /* ---------- MtThread ---------- */
  94. void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
  95. {
  96. p->mtCoder = mtCoder;
  97. p->outBuf = 0;
  98. p->inBuf = 0;
  99. Event_Construct(&p->canRead);
  100. Event_Construct(&p->canWrite);
  101. LoopThread_Construct(&p->thread);
  102. }
  103. #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
  104. static void CMtThread_CloseEvents(CMtThread *p)
  105. {
  106. Event_Close(&p->canRead);
  107. Event_Close(&p->canWrite);
  108. }
  109. static void CMtThread_Destruct(CMtThread *p)
  110. {
  111. CMtThread_CloseEvents(p);
  112. if (Thread_WasCreated(&p->thread.thread))
  113. {
  114. LoopThread_StopAndWait(&p->thread);
  115. LoopThread_Close(&p->thread);
  116. }
  117. if (p->mtCoder->alloc)
  118. IAlloc_Free(p->mtCoder->alloc, p->outBuf);
  119. p->outBuf = 0;
  120. if (p->mtCoder->alloc)
  121. IAlloc_Free(p->mtCoder->alloc, p->inBuf);
  122. p->inBuf = 0;
  123. }
  124. #define MY_BUF_ALLOC(buf, size, newSize) \
  125. if (buf == 0 || size != newSize) \
  126. { IAlloc_Free(p->mtCoder->alloc, buf); \
  127. size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
  128. if (buf == 0) return SZ_ERROR_MEM; }
  129. static SRes CMtThread_Prepare(CMtThread *p)
  130. {
  131. MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
  132. MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
  133. p->stopReading = False;
  134. p->stopWriting = False;
  135. RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
  136. RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
  137. return SZ_OK;
  138. }
  139. static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
  140. {
  141. size_t size = *processedSize;
  142. *processedSize = 0;
  143. while (size != 0)
  144. {
  145. size_t curSize = size;
  146. SRes res = stream->Read(stream, data, &curSize);
  147. *processedSize += curSize;
  148. data += curSize;
  149. size -= curSize;
  150. RINOK(res);
  151. if (curSize == 0)
  152. return SZ_OK;
  153. }
  154. return SZ_OK;
  155. }
  156. #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]
  157. static SRes MtThread_Process(CMtThread *p, Bool *stop)
  158. {
  159. CMtThread *next;
  160. *stop = True;
  161. if (Event_Wait(&p->canRead) != 0)
  162. return SZ_ERROR_THREAD;
  163. next = GET_NEXT_THREAD(p);
  164. if (p->stopReading)
  165. {
  166. next->stopReading = True;
  167. return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
  168. }
  169. {
  170. size_t size = p->mtCoder->blockSize;
  171. size_t destSize = p->outBufSize;
  172. RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
  173. next->stopReading = *stop = (size != p->mtCoder->blockSize);
  174. if (Event_Set(&next->canRead) != 0)
  175. return SZ_ERROR_THREAD;
  176. RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
  177. p->outBuf, &destSize, p->inBuf, size, *stop));
  178. MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
  179. if (Event_Wait(&p->canWrite) != 0)
  180. return SZ_ERROR_THREAD;
  181. if (p->stopWriting)
  182. return SZ_ERROR_FAIL;
  183. if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
  184. return SZ_ERROR_WRITE;
  185. return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
  186. }
  187. }
  188. static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
  189. {
  190. CMtThread *p = (CMtThread *)pp;
  191. for (;;)
  192. {
  193. Bool stop;
  194. CMtThread *next = GET_NEXT_THREAD(p);
  195. SRes res = MtThread_Process(p, &stop);
  196. if (res != SZ_OK)
  197. {
  198. MtCoder_SetError(p->mtCoder, res);
  199. MtProgress_SetError(&p->mtCoder->mtProgress, res);
  200. next->stopReading = True;
  201. next->stopWriting = True;
  202. Event_Set(&next->canRead);
  203. Event_Set(&next->canWrite);
  204. return res;
  205. }
  206. if (stop)
  207. return 0;
  208. }
  209. }
  210. void MtCoder_Construct(CMtCoder* p)
  211. {
  212. unsigned i;
  213. p->alloc = 0;
  214. for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
  215. {
  216. CMtThread *t = &p->threads[i];
  217. t->index = i;
  218. CMtThread_Construct(t, p);
  219. }
  220. CriticalSection_Init(&p->cs);
  221. CriticalSection_Init(&p->mtProgress.cs);
  222. }
  223. void MtCoder_Destruct(CMtCoder* p)
  224. {
  225. unsigned i;
  226. for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
  227. CMtThread_Destruct(&p->threads[i]);
  228. CriticalSection_Delete(&p->cs);
  229. CriticalSection_Delete(&p->mtProgress.cs);
  230. }
  231. SRes MtCoder_Code(CMtCoder *p)
  232. {
  233. unsigned i, numThreads = p->numThreads;
  234. SRes res = SZ_OK;
  235. p->res = SZ_OK;
  236. MtProgress_Init(&p->mtProgress, p->progress);
  237. for (i = 0; i < numThreads; i++)
  238. {
  239. RINOK(CMtThread_Prepare(&p->threads[i]));
  240. }
  241. for (i = 0; i < numThreads; i++)
  242. {
  243. CMtThread *t = &p->threads[i];
  244. CLoopThread *lt = &t->thread;
  245. if (!Thread_WasCreated(&lt->thread))
  246. {
  247. lt->func = ThreadFunc;
  248. lt->param = t;
  249. if (LoopThread_Create(lt) != SZ_OK)
  250. {
  251. res = SZ_ERROR_THREAD;
  252. break;
  253. }
  254. }
  255. }
  256. if (res == SZ_OK)
  257. {
  258. unsigned j;
  259. for (i = 0; i < numThreads; i++)
  260. {
  261. CMtThread *t = &p->threads[i];
  262. if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
  263. {
  264. res = SZ_ERROR_THREAD;
  265. p->threads[0].stopReading = True;
  266. break;
  267. }
  268. }
  269. Event_Set(&p->threads[0].canWrite);
  270. Event_Set(&p->threads[0].canRead);
  271. for (j = 0; j < i; j++)
  272. LoopThread_WaitSubThread(&p->threads[j].thread);
  273. }
  274. for (i = 0; i < numThreads; i++)
  275. CMtThread_CloseEvents(&p->threads[i]);
  276. return (res == SZ_OK) ? p->res : res;
  277. }