MtCoder.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. /* MtCoder.c -- Multi-thread Coder
  2. : Igor Pavlov : Public domain */
  3. #include "Precomp.h"
  4. #include "MtCoder.h"
  5. #ifndef Z7_ST
  6. static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
  7. {
  8. Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
  9. UInt64 inSize2 = 0;
  10. UInt64 outSize2 = 0;
  11. if (inSize != (UInt64)(Int64)-1)
  12. {
  13. inSize2 = inSize - p->inSize;
  14. p->inSize = inSize;
  15. }
  16. if (outSize != (UInt64)(Int64)-1)
  17. {
  18. outSize2 = outSize - p->outSize;
  19. p->outSize = outSize;
  20. }
  21. return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
  22. }
  23. void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
  24. {
  25. p->vt.Progress = MtProgressThunk_Progress;
  26. }
  27. #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
  28. static THREAD_FUNC_DECL ThreadFunc(void *pp);
  29. static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t
  30. #ifdef _WIN32
  31. , CMtCoder * const mtc
  32. #endif
  33. )
  34. {
  35. WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
  36. // printf("\n====== MtCoderThread_CreateAndStart : \n");
  37. if (wres == 0)
  38. {
  39. t->stop = False;
  40. if (!Thread_WasCreated(&t->thread))
  41. {
  42. #ifdef _WIN32
  43. if (mtc->numThreadGroups)
  44. wres = Thread_Create_With_Group(&t->thread, ThreadFunc, t,
  45. ThreadNextGroup_GetNext(&mtc->nextGroup), // group
  46. 0); // affinityMask
  47. else
  48. #endif
  49. wres = Thread_Create(&t->thread, ThreadFunc, t);
  50. }
  51. if (wres == 0)
  52. wres = Event_Set(&t->startEvent);
  53. }
  54. if (wres == 0)
  55. return SZ_OK;
  56. return MY_SRes_HRESULT_FROM_WRes(wres);
  57. }
  58. Z7_FORCE_INLINE
  59. static void MtCoderThread_Destruct(CMtCoderThread *t)
  60. {
  61. if (Thread_WasCreated(&t->thread))
  62. {
  63. t->stop = 1;
  64. Event_Set(&t->startEvent);
  65. Thread_Wait_Close(&t->thread);
  66. }
  67. Event_Close(&t->startEvent);
  68. if (t->inBuf)
  69. {
  70. ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
  71. t->inBuf = NULL;
  72. }
  73. }
  74. /*
  75. ThreadFunc2() returns:
  76. SZ_OK - in all normal cases (even for stream error or memory allocation error)
  77. SZ_ERROR_THREAD - in case of failure in system synch function
  78. */
  79. static SRes ThreadFunc2(CMtCoderThread *t)
  80. {
  81. CMtCoder * const mtc = t->mtCoder;
  82. for (;;)
  83. {
  84. unsigned bi;
  85. SRes res;
  86. SRes res2;
  87. BoolInt finished;
  88. unsigned bufIndex;
  89. size_t size;
  90. const Byte *inData;
  91. UInt64 readProcessed = 0;
  92. RINOK_THREAD(Event_Wait(&mtc->readEvent))
  93. /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
  94. if (mtc->stopReading)
  95. {
  96. return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
  97. }
  98. res = MtProgress_GetError(&mtc->mtProgress);
  99. size = 0;
  100. inData = NULL;
  101. finished = True;
  102. if (res == SZ_OK)
  103. {
  104. size = mtc->blockSize;
  105. if (mtc->inStream)
  106. {
  107. if (!t->inBuf)
  108. {
  109. t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
  110. if (!t->inBuf)
  111. res = SZ_ERROR_MEM;
  112. }
  113. if (res == SZ_OK)
  114. {
  115. res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
  116. readProcessed = mtc->readProcessed + size;
  117. mtc->readProcessed = readProcessed;
  118. }
  119. if (res != SZ_OK)
  120. {
  121. mtc->readRes = res;
  122. /* after reading error - we can stop encoding of previous blocks */
  123. MtProgress_SetError(&mtc->mtProgress, res);
  124. }
  125. else
  126. finished = (size != mtc->blockSize);
  127. }
  128. else
  129. {
  130. size_t rem;
  131. readProcessed = mtc->readProcessed;
  132. rem = mtc->inDataSize - (size_t)readProcessed;
  133. if (size > rem)
  134. size = rem;
  135. inData = mtc->inData + (size_t)readProcessed;
  136. readProcessed += size;
  137. mtc->readProcessed = readProcessed;
  138. finished = (mtc->inDataSize == (size_t)readProcessed);
  139. }
  140. }
  141. /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
  142. res2 = SZ_OK;
  143. if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
  144. {
  145. res2 = SZ_ERROR_THREAD;
  146. if (res == SZ_OK)
  147. {
  148. res = res2;
  149. // MtProgress_SetError(&mtc->mtProgress, res);
  150. }
  151. }
  152. bi = mtc->blockIndex;
  153. if (++mtc->blockIndex >= mtc->numBlocksMax)
  154. mtc->blockIndex = 0;
  155. bufIndex = (unsigned)(int)-1;
  156. if (res == SZ_OK)
  157. res = MtProgress_GetError(&mtc->mtProgress);
  158. if (res != SZ_OK)
  159. finished = True;
  160. if (!finished)
  161. {
  162. if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
  163. && mtc->expectedDataSize != readProcessed)
  164. {
  165. res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]
  166. #ifdef _WIN32
  167. , mtc
  168. #endif
  169. );
  170. if (res == SZ_OK)
  171. mtc->numStartedThreads++;
  172. else
  173. {
  174. MtProgress_SetError(&mtc->mtProgress, res);
  175. finished = True;
  176. }
  177. }
  178. }
  179. if (finished)
  180. mtc->stopReading = True;
  181. RINOK_THREAD(Event_Set(&mtc->readEvent))
  182. if (res2 != SZ_OK)
  183. return res2;
  184. if (res == SZ_OK)
  185. {
  186. CriticalSection_Enter(&mtc->cs);
  187. bufIndex = mtc->freeBlockHead;
  188. mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
  189. CriticalSection_Leave(&mtc->cs);
  190. res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
  191. mtc->inStream ? t->inBuf : inData, size, finished);
  192. // MtProgress_Reinit(&mtc->mtProgress, t->index);
  193. if (res != SZ_OK)
  194. MtProgress_SetError(&mtc->mtProgress, res);
  195. }
  196. {
  197. CMtCoderBlock * const block = &mtc->blocks[bi];
  198. block->res = res;
  199. block->bufIndex = bufIndex;
  200. block->finished = finished;
  201. }
  202. #ifdef MTCODER_USE_WRITE_THREAD
  203. RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
  204. #else
  205. {
  206. unsigned wi;
  207. {
  208. CriticalSection_Enter(&mtc->cs);
  209. wi = mtc->writeIndex;
  210. if (wi == bi)
  211. mtc->writeIndex = (unsigned)(int)-1;
  212. else
  213. mtc->ReadyBlocks[bi] = True;
  214. CriticalSection_Leave(&mtc->cs);
  215. }
  216. if (wi != bi)
  217. {
  218. if (res != SZ_OK || finished)
  219. return 0;
  220. continue;
  221. }
  222. if (mtc->writeRes != SZ_OK)
  223. res = mtc->writeRes;
  224. for (;;)
  225. {
  226. if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
  227. {
  228. res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
  229. if (res != SZ_OK)
  230. {
  231. mtc->writeRes = res;
  232. MtProgress_SetError(&mtc->mtProgress, res);
  233. }
  234. }
  235. if (++wi >= mtc->numBlocksMax)
  236. wi = 0;
  237. {
  238. BoolInt isReady;
  239. CriticalSection_Enter(&mtc->cs);
  240. if (bufIndex != (unsigned)(int)-1)
  241. {
  242. mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
  243. mtc->freeBlockHead = bufIndex;
  244. }
  245. isReady = mtc->ReadyBlocks[wi];
  246. if (isReady)
  247. mtc->ReadyBlocks[wi] = False;
  248. else
  249. mtc->writeIndex = wi;
  250. CriticalSection_Leave(&mtc->cs);
  251. RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
  252. if (!isReady)
  253. break;
  254. }
  255. {
  256. CMtCoderBlock *block = &mtc->blocks[wi];
  257. if (res == SZ_OK && block->res != SZ_OK)
  258. res = block->res;
  259. bufIndex = block->bufIndex;
  260. finished = block->finished;
  261. }
  262. }
  263. }
  264. #endif
  265. if (finished || res != SZ_OK)
  266. return 0;
  267. }
  268. }
  269. static THREAD_FUNC_DECL ThreadFunc(void *pp)
  270. {
  271. CMtCoderThread * const t = (CMtCoderThread *)pp;
  272. for (;;)
  273. {
  274. if (Event_Wait(&t->startEvent) != 0)
  275. return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
  276. if (t->stop)
  277. return 0;
  278. {
  279. const SRes res = ThreadFunc2(t);
  280. CMtCoder *mtc = t->mtCoder;
  281. if (res != SZ_OK)
  282. {
  283. MtProgress_SetError(&mtc->mtProgress, res);
  284. }
  285. #ifndef MTCODER_USE_WRITE_THREAD
  286. {
  287. const unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
  288. if (numFinished == mtc->numStartedThreads)
  289. if (Event_Set(&mtc->finishedEvent) != 0)
  290. return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
  291. }
  292. #endif
  293. }
  294. }
  295. }
  296. void MtCoder_Construct(CMtCoder *p)
  297. {
  298. unsigned i;
  299. p->blockSize = 0;
  300. p->numThreadsMax = 0;
  301. p->numThreadGroups = 0;
  302. p->expectedDataSize = (UInt64)(Int64)-1;
  303. p->inStream = NULL;
  304. p->inData = NULL;
  305. p->inDataSize = 0;
  306. p->progress = NULL;
  307. p->allocBig = NULL;
  308. p->mtCallback = NULL;
  309. p->mtCallbackObject = NULL;
  310. p->allocatedBufsSize = 0;
  311. Event_Construct(&p->readEvent);
  312. Semaphore_Construct(&p->blocksSemaphore);
  313. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  314. {
  315. CMtCoderThread *t = &p->threads[i];
  316. t->mtCoder = p;
  317. t->index = i;
  318. t->inBuf = NULL;
  319. t->stop = False;
  320. Event_Construct(&t->startEvent);
  321. Thread_CONSTRUCT(&t->thread)
  322. }
  323. #ifdef MTCODER_USE_WRITE_THREAD
  324. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  325. Event_Construct(&p->writeEvents[i]);
  326. #else
  327. Event_Construct(&p->finishedEvent);
  328. #endif
  329. CriticalSection_Init(&p->cs);
  330. CriticalSection_Init(&p->mtProgress.cs);
  331. }
  332. static void MtCoder_Free(CMtCoder *p)
  333. {
  334. unsigned i;
  335. /*
  336. p->stopReading = True;
  337. if (Event_IsCreated(&p->readEvent))
  338. Event_Set(&p->readEvent);
  339. */
  340. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  341. MtCoderThread_Destruct(&p->threads[i]);
  342. Event_Close(&p->readEvent);
  343. Semaphore_Close(&p->blocksSemaphore);
  344. #ifdef MTCODER_USE_WRITE_THREAD
  345. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  346. Event_Close(&p->writeEvents[i]);
  347. #else
  348. Event_Close(&p->finishedEvent);
  349. #endif
  350. }
  351. void MtCoder_Destruct(CMtCoder *p)
  352. {
  353. MtCoder_Free(p);
  354. CriticalSection_Delete(&p->cs);
  355. CriticalSection_Delete(&p->mtProgress.cs);
  356. }
  357. SRes MtCoder_Code(CMtCoder *p)
  358. {
  359. unsigned numThreads = p->numThreadsMax;
  360. unsigned numBlocksMax;
  361. unsigned i;
  362. SRes res = SZ_OK;
  363. // printf("\n====== MtCoder_Code : \n");
  364. if (numThreads > MTCODER_THREADS_MAX)
  365. numThreads = MTCODER_THREADS_MAX;
  366. numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
  367. if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
  368. if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
  369. if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
  370. if (numBlocksMax > MTCODER_BLOCKS_MAX)
  371. numBlocksMax = MTCODER_BLOCKS_MAX;
  372. if (p->blockSize != p->allocatedBufsSize)
  373. {
  374. for (i = 0; i < MTCODER_THREADS_MAX; i++)
  375. {
  376. CMtCoderThread *t = &p->threads[i];
  377. if (t->inBuf)
  378. {
  379. ISzAlloc_Free(p->allocBig, t->inBuf);
  380. t->inBuf = NULL;
  381. }
  382. }
  383. p->allocatedBufsSize = p->blockSize;
  384. }
  385. p->readRes = SZ_OK;
  386. MtProgress_Init(&p->mtProgress, p->progress);
  387. #ifdef MTCODER_USE_WRITE_THREAD
  388. for (i = 0; i < numBlocksMax; i++)
  389. {
  390. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
  391. }
  392. #else
  393. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
  394. #endif
  395. {
  396. RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
  397. RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, (UInt32)numBlocksMax, (UInt32)numBlocksMax))
  398. }
  399. for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
  400. p->freeBlockList[i] = i + 1;
  401. p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
  402. p->freeBlockHead = 0;
  403. p->readProcessed = 0;
  404. p->blockIndex = 0;
  405. p->numBlocksMax = numBlocksMax;
  406. p->stopReading = False;
  407. #ifndef MTCODER_USE_WRITE_THREAD
  408. p->writeIndex = 0;
  409. p->writeRes = SZ_OK;
  410. for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
  411. p->ReadyBlocks[i] = False;
  412. p->numFinishedThreads = 0;
  413. #endif
  414. p->numStartedThreadsLimit = numThreads;
  415. p->numStartedThreads = 0;
  416. ThreadNextGroup_Init(&p->nextGroup, p->numThreadGroups, 0); // startGroup
  417. // for (i = 0; i < numThreads; i++)
  418. {
  419. // here we create new thread for first block.
  420. // And each new thread will create another new thread after block reading
  421. // until numStartedThreadsLimit is reached.
  422. CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
  423. {
  424. const SRes res2 = MtCoderThread_CreateAndStart(nextThread
  425. #ifdef _WIN32
  426. , p
  427. #endif
  428. );
  429. RINOK(res2)
  430. }
  431. }
  432. RINOK_THREAD(Event_Set(&p->readEvent))
  433. #ifdef MTCODER_USE_WRITE_THREAD
  434. {
  435. unsigned bi = 0;
  436. for (;; bi++)
  437. {
  438. if (bi >= numBlocksMax)
  439. bi = 0;
  440. RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
  441. {
  442. const CMtCoderBlock * const block = &p->blocks[bi];
  443. const unsigned bufIndex = block->bufIndex;
  444. const BoolInt finished = block->finished;
  445. if (res == SZ_OK && block->res != SZ_OK)
  446. res = block->res;
  447. if (bufIndex != (unsigned)(int)-1)
  448. {
  449. if (res == SZ_OK)
  450. {
  451. res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
  452. if (res != SZ_OK)
  453. MtProgress_SetError(&p->mtProgress, res);
  454. }
  455. CriticalSection_Enter(&p->cs);
  456. {
  457. p->freeBlockList[bufIndex] = p->freeBlockHead;
  458. p->freeBlockHead = bufIndex;
  459. }
  460. CriticalSection_Leave(&p->cs);
  461. }
  462. RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
  463. if (finished)
  464. break;
  465. }
  466. }
  467. }
  468. #else
  469. {
  470. const WRes wres = Event_Wait(&p->finishedEvent);
  471. res = MY_SRes_HRESULT_FROM_WRes(wres);
  472. }
  473. #endif
  474. if (res == SZ_OK)
  475. res = p->readRes;
  476. if (res == SZ_OK)
  477. res = p->mtProgress.res;
  478. #ifndef MTCODER_USE_WRITE_THREAD
  479. if (res == SZ_OK)
  480. res = p->writeRes;
  481. #endif
  482. if (res != SZ_OK)
  483. MtCoder_Free(p);
  484. return res;
  485. }
  486. #endif
  487. #undef RINOK_THREAD