MtCoder.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /* MtCoder.c -- Multi-thread Coder
  2. 2023-09-07 : Igor Pavlov : Public domain */
  3. #include "Precomp.h"
  4. #include "MtCoder.h"
  5. #ifndef Z7_ST
  6. static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
  7. {
  8. Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
  9. UInt64 inSize2 = 0;
  10. UInt64 outSize2 = 0;
  11. if (inSize != (UInt64)(Int64)-1)
  12. {
  13. inSize2 = inSize - p->inSize;
  14. p->inSize = inSize;
  15. }
  16. if (outSize != (UInt64)(Int64)-1)
  17. {
  18. outSize2 = outSize - p->outSize;
  19. p->outSize = outSize;
  20. }
  21. return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
  22. }
  23. void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
  24. {
  25. p->vt.Progress = MtProgressThunk_Progress;
  26. }
  27. #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
  28. static THREAD_FUNC_DECL ThreadFunc(void *pp);
  29. static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
  30. {
  31. WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
  32. if (wres == 0)
  33. {
  34. t->stop = False;
  35. if (!Thread_WasCreated(&t->thread))
  36. wres = Thread_Create(&t->thread, ThreadFunc, t);
  37. if (wres == 0)
  38. wres = Event_Set(&t->startEvent);
  39. }
  40. if (wres == 0)
  41. return SZ_OK;
  42. return MY_SRes_HRESULT_FROM_WRes(wres);
  43. }
  44. static void MtCoderThread_Destruct(CMtCoderThread *t)
  45. {
  46. if (Thread_WasCreated(&t->thread))
  47. {
  48. t->stop = 1;
  49. Event_Set(&t->startEvent);
  50. Thread_Wait_Close(&t->thread);
  51. }
  52. Event_Close(&t->startEvent);
  53. if (t->inBuf)
  54. {
  55. ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
  56. t->inBuf = NULL;
  57. }
  58. }
  59. /*
  60. ThreadFunc2() returns:
  61. SZ_OK - in all normal cases (even for stream error or memory allocation error)
  62. SZ_ERROR_THREAD - in case of failure in system synch function
  63. */
  64. static SRes ThreadFunc2(CMtCoderThread *t)
  65. {
  66. CMtCoder *mtc = t->mtCoder;
  67. for (;;)
  68. {
  69. unsigned bi;
  70. SRes res;
  71. SRes res2;
  72. BoolInt finished;
  73. unsigned bufIndex;
  74. size_t size;
  75. const Byte *inData;
  76. UInt64 readProcessed = 0;
  77. RINOK_THREAD(Event_Wait(&mtc->readEvent))
  78. /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
  79. if (mtc->stopReading)
  80. {
  81. return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
  82. }
  83. res = MtProgress_GetError(&mtc->mtProgress);
  84. size = 0;
  85. inData = NULL;
  86. finished = True;
  87. if (res == SZ_OK)
  88. {
  89. size = mtc->blockSize;
  90. if (mtc->inStream)
  91. {
  92. if (!t->inBuf)
  93. {
  94. t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
  95. if (!t->inBuf)
  96. res = SZ_ERROR_MEM;
  97. }
  98. if (res == SZ_OK)
  99. {
  100. res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
  101. readProcessed = mtc->readProcessed + size;
  102. mtc->readProcessed = readProcessed;
  103. }
  104. if (res != SZ_OK)
  105. {
  106. mtc->readRes = res;
  107. /* after reading error - we can stop encoding of previous blocks */
  108. MtProgress_SetError(&mtc->mtProgress, res);
  109. }
  110. else
  111. finished = (size != mtc->blockSize);
  112. }
  113. else
  114. {
  115. size_t rem;
  116. readProcessed = mtc->readProcessed;
  117. rem = mtc->inDataSize - (size_t)readProcessed;
  118. if (size > rem)
  119. size = rem;
  120. inData = mtc->inData + (size_t)readProcessed;
  121. readProcessed += size;
  122. mtc->readProcessed = readProcessed;
  123. finished = (mtc->inDataSize == (size_t)readProcessed);
  124. }
  125. }
  126. /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
  127. res2 = SZ_OK;
  128. if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
  129. {
  130. res2 = SZ_ERROR_THREAD;
  131. if (res == SZ_OK)
  132. {
  133. res = res2;
  134. // MtProgress_SetError(&mtc->mtProgress, res);
  135. }
  136. }
  137. bi = mtc->blockIndex;
  138. if (++mtc->blockIndex >= mtc->numBlocksMax)
  139. mtc->blockIndex = 0;
  140. bufIndex = (unsigned)(int)-1;
  141. if (res == SZ_OK)
  142. res = MtProgress_GetError(&mtc->mtProgress);
  143. if (res != SZ_OK)
  144. finished = True;
  145. if (!finished)
  146. {
  147. if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
  148. && mtc->expectedDataSize != readProcessed)
  149. {
  150. res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
  151. if (res == SZ_OK)
  152. mtc->numStartedThreads++;
  153. else
  154. {
  155. MtProgress_SetError(&mtc->mtProgress, res);
  156. finished = True;
  157. }
  158. }
  159. }
  160. if (finished)
  161. mtc->stopReading = True;
  162. RINOK_THREAD(Event_Set(&mtc->readEvent))
  163. if (res2 != SZ_OK)
  164. return res2;
  165. if (res == SZ_OK)
  166. {
  167. CriticalSection_Enter(&mtc->cs);
  168. bufIndex = mtc->freeBlockHead;
  169. mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
  170. CriticalSection_Leave(&mtc->cs);
  171. res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
  172. mtc->inStream ? t->inBuf : inData, size, finished);
  173. // MtProgress_Reinit(&mtc->mtProgress, t->index);
  174. if (res != SZ_OK)
  175. MtProgress_SetError(&mtc->mtProgress, res);
  176. }
  177. {
  178. CMtCoderBlock *block = &mtc->blocks[bi];
  179. block->res = res;
  180. block->bufIndex = bufIndex;
  181. block->finished = finished;
  182. }
  183. #ifdef MTCODER_USE_WRITE_THREAD
  184. RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
  185. #else
  186. {
  187. unsigned wi;
  188. {
  189. CriticalSection_Enter(&mtc->cs);
  190. wi = mtc->writeIndex;
  191. if (wi == bi)
  192. mtc->writeIndex = (unsigned)(int)-1;
  193. else
  194. mtc->ReadyBlocks[bi] = True;
  195. CriticalSection_Leave(&mtc->cs);
  196. }
  197. if (wi != bi)
  198. {
  199. if (res != SZ_OK || finished)
  200. return 0;
  201. continue;
  202. }
  203. if (mtc->writeRes != SZ_OK)
  204. res = mtc->writeRes;
  205. for (;;)
  206. {
  207. if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
  208. {
  209. res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
  210. if (res != SZ_OK)
  211. {
  212. mtc->writeRes = res;
  213. MtProgress_SetError(&mtc->mtProgress, res);
  214. }
  215. }
  216. if (++wi >= mtc->numBlocksMax)
  217. wi = 0;
  218. {
  219. BoolInt isReady;
  220. CriticalSection_Enter(&mtc->cs);
  221. if (bufIndex != (unsigned)(int)-1)
  222. {
  223. mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
  224. mtc->freeBlockHead = bufIndex;
  225. }
  226. isReady = mtc->ReadyBlocks[wi];
  227. if (isReady)
  228. mtc->ReadyBlocks[wi] = False;
  229. else
  230. mtc->writeIndex = wi;
  231. CriticalSection_Leave(&mtc->cs);
  232. RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
  233. if (!isReady)
  234. break;
  235. }
  236. {
  237. CMtCoderBlock *block = &mtc->blocks[wi];
  238. if (res == SZ_OK && block->res != SZ_OK)
  239. res = block->res;
  240. bufIndex = block->bufIndex;
  241. finished = block->finished;
  242. }
  243. }
  244. }
  245. #endif
  246. if (finished || res != SZ_OK)
  247. return 0;
  248. }
  249. }
  250. static THREAD_FUNC_DECL ThreadFunc(void *pp)
  251. {
  252. CMtCoderThread *t = (CMtCoderThread *)pp;
  253. for (;;)
  254. {
  255. if (Event_Wait(&t->startEvent) != 0)
  256. return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
  257. if (t->stop)
  258. return 0;
  259. {
  260. SRes res = ThreadFunc2(t);
  261. CMtCoder *mtc = t->mtCoder;
  262. if (res != SZ_OK)
  263. {
  264. MtProgress_SetError(&mtc->mtProgress, res);
  265. }
  266. #ifndef MTCODER_USE_WRITE_THREAD
  267. {
  268. unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
  269. if (numFinished == mtc->numStartedThreads)
  270. if (Event_Set(&mtc->finishedEvent) != 0)
  271. return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
  272. }
  273. #endif
  274. }
  275. }
  276. }
  277. void MtCoder_Construct(CMtCoder *p)
  278. {
  279. unsigned i;
  280. p->blockSize = 0;
  281. p->numThreadsMax = 0;
  282. p->expectedDataSize = (UInt64)(Int64)-1;
  283. p->inStream = NULL;
  284. p->inData = NULL;
  285. p->inDataSize = 0;
  286. p->progress = NULL;
  287. p->allocBig = NULL;
  288. p->mtCallback = NULL;
  289. p->mtCallbackObject = NULL;
  290. p->allocatedBufsSize = 0;
  291. Event_Construct(&p->readEvent);
  292. Semaphore_Construct(&p->blocksSemaphore);
  293. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  294. {
  295. CMtCoderThread *t = &p->threads[i];
  296. t->mtCoder = p;
  297. t->index = i;
  298. t->inBuf = NULL;
  299. t->stop = False;
  300. Event_Construct(&t->startEvent);
  301. Thread_CONSTRUCT(&t->thread)
  302. }
  303. #ifdef MTCODER_USE_WRITE_THREAD
  304. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  305. Event_Construct(&p->writeEvents[i]);
  306. #else
  307. Event_Construct(&p->finishedEvent);
  308. #endif
  309. CriticalSection_Init(&p->cs);
  310. CriticalSection_Init(&p->mtProgress.cs);
  311. }
  312. static void MtCoder_Free(CMtCoder *p)
  313. {
  314. unsigned i;
  315. /*
  316. p->stopReading = True;
  317. if (Event_IsCreated(&p->readEvent))
  318. Event_Set(&p->readEvent);
  319. */
  320. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  321. MtCoderThread_Destruct(&p->threads[i]);
  322. Event_Close(&p->readEvent);
  323. Semaphore_Close(&p->blocksSemaphore);
  324. #ifdef MTCODER_USE_WRITE_THREAD
  325. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  326. Event_Close(&p->writeEvents[i]);
  327. #else
  328. Event_Close(&p->finishedEvent);
  329. #endif
  330. }
  331. void MtCoder_Destruct(CMtCoder *p)
  332. {
  333. MtCoder_Free(p);
  334. CriticalSection_Delete(&p->cs);
  335. CriticalSection_Delete(&p->mtProgress.cs);
  336. }
  337. SRes MtCoder_Code(CMtCoder *p)
  338. {
  339. unsigned numThreads = p->numThreadsMax;
  340. unsigned numBlocksMax;
  341. unsigned i;
  342. SRes res = SZ_OK;
  343. if (numThreads > MTCODER_THREADS_MAX)
  344. numThreads = MTCODER_THREADS_MAX;
  345. numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
  346. if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
  347. if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
  348. if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
  349. if (numBlocksMax > MTCODER_BLOCKS_MAX)
  350. numBlocksMax = MTCODER_BLOCKS_MAX;
  351. if (p->blockSize != p->allocatedBufsSize)
  352. {
  353. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  354. {
  355. CMtCoderThread *t = &p->threads[i];
  356. if (t->inBuf)
  357. {
  358. ISzAlloc_Free(p->allocBig, t->inBuf);
  359. t->inBuf = NULL;
  360. }
  361. }
  362. p->allocatedBufsSize = p->blockSize;
  363. }
  364. p->readRes = SZ_OK;
  365. MtProgress_Init(&p->mtProgress, p->progress);
  366. #ifdef MTCODER_USE_WRITE_THREAD
  367. for (i = 0; i < numBlocksMax; i++)
  368. {
  369. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
  370. }
  371. #else
  372. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
  373. #endif
  374. {
  375. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
  376. RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, (UInt32)numBlocksMax, (UInt32)numBlocksMax))
  377. }
  378. for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
  379. p->freeBlockList[i] = i + 1;
  380. p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
  381. p->freeBlockHead = 0;
  382. p->readProcessed = 0;
  383. p->blockIndex = 0;
  384. p->numBlocksMax = numBlocksMax;
  385. p->stopReading = False;
  386. #ifndef MTCODER_USE_WRITE_THREAD
  387. p->writeIndex = 0;
  388. p->writeRes = SZ_OK;
  389. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  390. p->ReadyBlocks[i] = False;
  391. p->numFinishedThreads = 0;
  392. #endif
  393. p->numStartedThreadsLimit = numThreads;
  394. p->numStartedThreads = 0;
  395. // for (i = 0; i < numThreads; i++)
  396. {
  397. CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
  398. RINOK(MtCoderThread_CreateAndStart(nextThread))
  399. }
  400. RINOK_THREAD(Event_Set(&p->readEvent))
  401. #ifdef MTCODER_USE_WRITE_THREAD
  402. {
  403. unsigned bi = 0;
  404. for (;; bi++)
  405. {
  406. if (bi >= numBlocksMax)
  407. bi = 0;
  408. RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
  409. {
  410. const CMtCoderBlock *block = &p->blocks[bi];
  411. unsigned bufIndex = block->bufIndex;
  412. BoolInt finished = block->finished;
  413. if (res == SZ_OK && block->res != SZ_OK)
  414. res = block->res;
  415. if (bufIndex != (unsigned)(int)-1)
  416. {
  417. if (res == SZ_OK)
  418. {
  419. res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
  420. if (res != SZ_OK)
  421. MtProgress_SetError(&p->mtProgress, res);
  422. }
  423. CriticalSection_Enter(&p->cs);
  424. {
  425. p->freeBlockList[bufIndex] = p->freeBlockHead;
  426. p->freeBlockHead = bufIndex;
  427. }
  428. CriticalSection_Leave(&p->cs);
  429. }
  430. RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
  431. if (finished)
  432. break;
  433. }
  434. }
  435. }
  436. #else
  437. {
  438. WRes wres = Event_Wait(&p->finishedEvent);
  439. res = MY_SRes_HRESULT_FROM_WRes(wres);
  440. }
  441. #endif
  442. if (res == SZ_OK)
  443. res = p->readRes;
  444. if (res == SZ_OK)
  445. res = p->mtProgress.res;
  446. #ifndef MTCODER_USE_WRITE_THREAD
  447. if (res == SZ_OK)
  448. res = p->writeRes;
  449. #endif
  450. if (res != SZ_OK)
  451. MtCoder_Free(p);
  452. return res;
  453. }
  454. #endif
  455. #undef RINOK_THREAD