MtDec.c 29 KB


  1. /* MtDec.c -- Multi-thread Decoder
  2. 2024-02-20 : Igor Pavlov : Public domain */
  3. #include "Precomp.h"
  4. // #define SHOW_DEBUG_INFO
  5. // #include <stdio.h>
  6. #include <string.h>
  7. #ifdef SHOW_DEBUG_INFO
  8. #include <stdio.h>
  9. #endif
  10. #include "MtDec.h"
  11. #ifndef Z7_ST
  12. #ifdef SHOW_DEBUG_INFO
  13. #define PRF(x) x
  14. #else
  15. #define PRF(x)
  16. #endif
  17. #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
  18. void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress)
  19. {
  20. p->progress = progress;
  21. p->res = SZ_OK;
  22. p->totalInSize = 0;
  23. p->totalOutSize = 0;
  24. }
  25. SRes MtProgress_Progress_ST(CMtProgress *p)
  26. {
  27. if (p->res == SZ_OK && p->progress)
  28. if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
  29. p->res = SZ_ERROR_PROGRESS;
  30. return p->res;
  31. }
  32. SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
  33. {
  34. SRes res;
  35. CriticalSection_Enter(&p->cs);
  36. p->totalInSize += inSize;
  37. p->totalOutSize += outSize;
  38. if (p->res == SZ_OK && p->progress)
  39. if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
  40. p->res = SZ_ERROR_PROGRESS;
  41. res = p->res;
  42. CriticalSection_Leave(&p->cs);
  43. return res;
  44. }
  45. SRes MtProgress_GetError(CMtProgress *p)
  46. {
  47. SRes res;
  48. CriticalSection_Enter(&p->cs);
  49. res = p->res;
  50. CriticalSection_Leave(&p->cs);
  51. return res;
  52. }
  53. void MtProgress_SetError(CMtProgress *p, SRes res)
  54. {
  55. CriticalSection_Enter(&p->cs);
  56. if (p->res == SZ_OK)
  57. p->res = res;
  58. CriticalSection_Leave(&p->cs);
  59. }
  60. #define RINOK_THREAD(x) RINOK_WRes(x)
  61. struct CMtDecBufLink_
  62. {
  63. struct CMtDecBufLink_ *next;
  64. void *pad[3];
  65. };
  66. typedef struct CMtDecBufLink_ CMtDecBufLink;
  67. #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
  68. #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
  69. static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp);
  70. static WRes MtDecThread_CreateEvents(CMtDecThread *t)
  71. {
  72. WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite);
  73. if (wres == 0)
  74. {
  75. wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead);
  76. if (wres == 0)
  77. return SZ_OK;
  78. }
  79. return wres;
  80. }
  81. static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
  82. {
  83. WRes wres = MtDecThread_CreateEvents(t);
  84. // wres = 17; // for test
  85. if (wres == 0)
  86. {
  87. if (Thread_WasCreated(&t->thread))
  88. return SZ_OK;
  89. wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t);
  90. if (wres == 0)
  91. return SZ_OK;
  92. }
  93. return MY_SRes_HRESULT_FROM_WRes(wres);
  94. }
  95. void MtDecThread_FreeInBufs(CMtDecThread *t)
  96. {
  97. if (t->inBuf)
  98. {
  99. void *link = t->inBuf;
  100. t->inBuf = NULL;
  101. do
  102. {
  103. void *next = ((CMtDecBufLink *)link)->next;
  104. ISzAlloc_Free(t->mtDec->alloc, link);
  105. link = next;
  106. }
  107. while (link);
  108. }
  109. }
  110. static void MtDecThread_CloseThread(CMtDecThread *t)
  111. {
  112. if (Thread_WasCreated(&t->thread))
  113. {
  114. Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
  115. Event_Set(&t->canRead);
  116. Thread_Wait_Close(&t->thread);
  117. }
  118. Event_Close(&t->canRead);
  119. Event_Close(&t->canWrite);
  120. }
  121. static void MtDec_CloseThreads(CMtDec *p)
  122. {
  123. unsigned i;
  124. for (i = 0; i < MTDEC_THREADS_MAX; i++)
  125. MtDecThread_CloseThread(&p->threads[i]);
  126. }
  127. static void MtDecThread_Destruct(CMtDecThread *t)
  128. {
  129. MtDecThread_CloseThread(t);
  130. MtDecThread_FreeInBufs(t);
  131. }
  132. static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
  133. {
  134. SRes res;
  135. CriticalSection_Enter(&p->mtProgress.cs);
  136. *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  137. res = p->mtProgress.res;
  138. CriticalSection_Leave(&p->mtProgress.cs);
  139. return res;
  140. }
  141. static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
  142. {
  143. SRes res;
  144. CriticalSection_Enter(&p->mtProgress.cs);
  145. p->mtProgress.totalInSize += inSize;
  146. p->mtProgress.totalOutSize += outSize;
  147. if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
  148. if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
  149. p->mtProgress.res = SZ_ERROR_PROGRESS;
  150. *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  151. res = p->mtProgress.res;
  152. CriticalSection_Leave(&p->mtProgress.cs);
  153. return res;
  154. }
  155. static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
  156. {
  157. CriticalSection_Enter(&p->mtProgress.cs);
  158. if (!p->needInterrupt || interruptIndex < p->interruptIndex)
  159. {
  160. p->interruptIndex = interruptIndex;
  161. p->needInterrupt = True;
  162. }
  163. CriticalSection_Leave(&p->mtProgress.cs);
  164. }
  165. Byte *MtDec_GetCrossBuff(CMtDec *p)
  166. {
  167. Byte *cr = p->crossBlock;
  168. if (!cr)
  169. {
  170. cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
  171. if (!cr)
  172. return NULL;
  173. p->crossBlock = cr;
  174. }
  175. return MTDEC__DATA_PTR_FROM_LINK(cr);
  176. }
  177. /*
  178. MtDec_ThreadFunc2() returns:
  179. 0 - in all normal cases (even for stream error or memory allocation error)
  180. (!= 0) - WRes error return by system threading function
  181. */
  182. // #define MTDEC_ProgessStep (1 << 22)
  183. #define MTDEC_ProgessStep (1 << 0)
  184. static WRes MtDec_ThreadFunc2(CMtDecThread *t)
  185. {
  186. CMtDec *p = t->mtDec;
  187. PRF_STR_INT("MtDec_ThreadFunc2", t->index)
  188. // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
  189. for (;;)
  190. {
  191. SRes res, codeRes;
  192. BoolInt wasInterrupted, isAllocError, overflow, finish;
  193. SRes threadingErrorSRes;
  194. BoolInt needCode, needWrite, needContinue;
  195. size_t inDataSize_Start;
  196. UInt64 inDataSize;
  197. // UInt64 inDataSize_Full;
  198. UInt64 blockIndex;
  199. UInt64 inPrev = 0;
  200. UInt64 outPrev = 0;
  201. UInt64 inCodePos;
  202. UInt64 outCodePos;
  203. Byte *afterEndData = NULL;
  204. size_t afterEndData_Size = 0;
  205. BoolInt afterEndData_IsCross = False;
  206. BoolInt canCreateNewThread = False;
  207. // CMtDecCallbackInfo parse;
  208. CMtDecThread *nextThread;
  209. PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index)
  210. RINOK_THREAD(Event_Wait(&t->canRead))
  211. if (p->exitThread)
  212. return 0;
  213. PRF_STR_INT("after Event_Wait(&t->canRead)", t->index)
  214. // if (t->index == 3) return 19; // for test
  215. blockIndex = p->blockIndex++;
  216. // PRF(printf("\ncanRead\n"))
  217. res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
  218. finish = p->readWasFinished;
  219. needCode = False;
  220. needWrite = False;
  221. isAllocError = False;
  222. overflow = False;
  223. inDataSize_Start = 0;
  224. inDataSize = 0;
  225. // inDataSize_Full = 0;
  226. if (res == SZ_OK && !wasInterrupted)
  227. {
  228. // if (p->inStream)
  229. {
  230. CMtDecBufLink *prev = NULL;
  231. CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
  232. size_t crossSize = p->crossEnd - p->crossStart;
  233. PRF(printf("\ncrossSize = %d\n", crossSize));
  234. for (;;)
  235. {
  236. if (!link)
  237. {
  238. link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
  239. if (!link)
  240. {
  241. finish = True;
  242. // p->allocError_for_Read_BlockIndex = blockIndex;
  243. isAllocError = True;
  244. break;
  245. }
  246. link->next = NULL;
  247. if (prev)
  248. {
  249. // static unsigned g_num = 0;
  250. // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
  251. prev->next = link;
  252. }
  253. else
  254. t->inBuf = (void *)link;
  255. }
  256. {
  257. Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
  258. Byte *parseData = data;
  259. size_t size;
  260. if (crossSize != 0)
  261. {
  262. inDataSize = crossSize;
  263. // inDataSize_Full = inDataSize;
  264. inDataSize_Start = crossSize;
  265. size = crossSize;
  266. parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
  267. PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
  268. (int)p->crossStart, (int)p->crossEnd, (int)finish));
  269. }
  270. else
  271. {
  272. size = p->inBufSize;
  273. res = SeqInStream_ReadMax(p->inStream, data, &size);
  274. // size = 10; // test
  275. inDataSize += size;
  276. // inDataSize_Full = inDataSize;
  277. if (!prev)
  278. inDataSize_Start = size;
  279. p->readProcessed += size;
  280. finish = (size != p->inBufSize);
  281. if (finish)
  282. p->readWasFinished = True;
  283. // res = E_INVALIDARG; // test
  284. if (res != SZ_OK)
  285. {
  286. // PRF(printf("\nRead error = %d\n", res))
  287. // we want to decode all data before error
  288. p->readRes = res;
  289. // p->readError_BlockIndex = blockIndex;
  290. p->readWasFinished = True;
  291. finish = True;
  292. res = SZ_OK;
  293. // break;
  294. }
  295. if (inDataSize - inPrev >= MTDEC_ProgessStep)
  296. {
  297. res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
  298. if (res != SZ_OK || wasInterrupted)
  299. break;
  300. inPrev = inDataSize;
  301. }
  302. }
  303. {
  304. CMtDecCallbackInfo parse;
  305. parse.startCall = (prev == NULL);
  306. parse.src = parseData;
  307. parse.srcSize = size;
  308. parse.srcFinished = finish;
  309. parse.canCreateNewThread = True;
  310. PRF(printf("\nParse size = %d\n", (unsigned)size));
  311. p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
  312. PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
  313. needWrite = True;
  314. canCreateNewThread = parse.canCreateNewThread;
  315. // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
  316. if (
  317. // parseRes != SZ_OK ||
  318. // inDataSize - (size - parse.srcSize) > p->inBlockMax
  319. // ||
  320. parse.state == MTDEC_PARSE_OVERFLOW
  321. // || wasInterrupted
  322. )
  323. {
  324. // Overflow or Parse error - switch from MT decoding to ST decoding
  325. finish = True;
  326. overflow = True;
  327. {
  328. PRF(printf("\n Overflow"));
  329. // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
  330. PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
  331. }
  332. if (crossSize != 0)
  333. memcpy(data, parseData, size);
  334. p->crossStart = 0;
  335. p->crossEnd = 0;
  336. break;
  337. }
  338. if (crossSize != 0)
  339. {
  340. memcpy(data, parseData, parse.srcSize);
  341. p->crossStart += parse.srcSize;
  342. }
  343. if (parse.state != MTDEC_PARSE_CONTINUE || finish)
  344. {
  345. // we don't need to parse in current thread anymore
  346. if (parse.state == MTDEC_PARSE_END)
  347. finish = True;
  348. needCode = True;
  349. // p->crossFinished = finish;
  350. if (parse.srcSize == size)
  351. {
  352. // full parsed - no cross transfer
  353. p->crossStart = 0;
  354. p->crossEnd = 0;
  355. break;
  356. }
  357. if (parse.state == MTDEC_PARSE_END)
  358. {
  359. afterEndData = parseData + parse.srcSize;
  360. afterEndData_Size = size - parse.srcSize;
  361. if (crossSize != 0)
  362. afterEndData_IsCross = True;
  363. // we reduce data size to required bytes (parsed only)
  364. inDataSize -= afterEndData_Size;
  365. if (!prev)
  366. inDataSize_Start = parse.srcSize;
  367. break;
  368. }
  369. {
  370. // partial parsed - need cross transfer
  371. if (crossSize != 0)
  372. inDataSize = parse.srcSize; // it's only parsed now
  373. else
  374. {
  375. // partial parsed - is not in initial cross block - we need to copy new data to cross block
  376. Byte *cr = MtDec_GetCrossBuff(p);
  377. if (!cr)
  378. {
  379. {
  380. PRF(printf("\ncross alloc error error\n"));
  381. // res = SZ_ERROR_MEM;
  382. finish = True;
  383. // p->allocError_for_Read_BlockIndex = blockIndex;
  384. isAllocError = True;
  385. break;
  386. }
  387. }
  388. {
  389. size_t crSize = size - parse.srcSize;
  390. inDataSize -= crSize;
  391. p->crossEnd = crSize;
  392. p->crossStart = 0;
  393. memcpy(cr, parseData + parse.srcSize, crSize);
  394. }
  395. }
  396. // inDataSize_Full = inDataSize;
  397. if (!prev)
  398. inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
  399. finish = False;
  400. break;
  401. }
  402. }
  403. if (parse.srcSize != size)
  404. {
  405. res = SZ_ERROR_FAIL;
  406. PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
  407. break;
  408. }
  409. }
  410. }
  411. prev = link;
  412. link = link->next;
  413. if (crossSize != 0)
  414. {
  415. crossSize = 0;
  416. p->crossStart = 0;
  417. p->crossEnd = 0;
  418. }
  419. }
  420. }
  421. if (res == SZ_OK)
  422. res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
  423. }
  424. codeRes = SZ_OK;
  425. if (res == SZ_OK && needCode && !wasInterrupted)
  426. {
  427. codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
  428. if (codeRes != SZ_OK)
  429. {
  430. needCode = False;
  431. finish = True;
  432. // SZ_ERROR_MEM is expected error here.
  433. // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
  434. // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
  435. }
  436. }
  437. if (res != SZ_OK || wasInterrupted)
  438. finish = True;
  439. nextThread = NULL;
  440. threadingErrorSRes = SZ_OK;
  441. if (!finish)
  442. {
  443. if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
  444. {
  445. SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
  446. if (res2 == SZ_OK)
  447. {
  448. // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
  449. p->numStartedThreads++;
  450. }
  451. else
  452. {
  453. PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
  454. if (p->numStartedThreads == 1)
  455. {
  456. // if only one thread is possible, we leave muti-threading code
  457. finish = True;
  458. needCode = False;
  459. threadingErrorSRes = res2;
  460. }
  461. else
  462. p->numStartedThreads_Limit = p->numStartedThreads;
  463. }
  464. }
  465. if (!finish)
  466. {
  467. unsigned nextIndex = t->index + 1;
  468. nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
  469. RINOK_THREAD(Event_Set(&nextThread->canRead))
  470. // We have started executing for new iteration (with next thread)
  471. // And that next thread now is responsible for possible exit from decoding (threading_code)
  472. }
  473. }
  474. // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
  475. // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
  476. // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
  477. // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
  478. // - otherwise we stop decoding and exit from MtDec_ThreadFunc2()
  479. // Don't change (finish) variable in the further code
  480. // ---------- CODE ----------
  481. inPrev = 0;
  482. outPrev = 0;
  483. inCodePos = 0;
  484. outCodePos = 0;
  485. if (res == SZ_OK && needCode && codeRes == SZ_OK)
  486. {
  487. BoolInt isStartBlock = True;
  488. CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
  489. for (;;)
  490. {
  491. size_t inSize;
  492. int stop;
  493. if (isStartBlock)
  494. inSize = inDataSize_Start;
  495. else
  496. {
  497. UInt64 rem = inDataSize - inCodePos;
  498. inSize = p->inBufSize;
  499. if (inSize > rem)
  500. inSize = (size_t)rem;
  501. }
  502. inCodePos += inSize;
  503. stop = True;
  504. codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
  505. (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
  506. (inCodePos == inDataSize), // srcFinished
  507. &inCodePos, &outCodePos, &stop);
  508. if (codeRes != SZ_OK)
  509. {
  510. PRF(printf("\nCode Interrupt error = %x\n", codeRes));
  511. // we interrupt only later blocks
  512. MtDec_Interrupt(p, blockIndex);
  513. break;
  514. }
  515. if (stop || inCodePos == inDataSize)
  516. break;
  517. {
  518. const UInt64 inDelta = inCodePos - inPrev;
  519. const UInt64 outDelta = outCodePos - outPrev;
  520. if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
  521. {
  522. // Sleep(1);
  523. res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
  524. if (res != SZ_OK || wasInterrupted)
  525. break;
  526. inPrev = inCodePos;
  527. outPrev = outCodePos;
  528. }
  529. }
  530. link = link->next;
  531. isStartBlock = False;
  532. }
  533. }
  534. // ---------- WRITE ----------
  535. RINOK_THREAD(Event_Wait(&t->canWrite))
  536. {
  537. BoolInt isErrorMode = False;
  538. BoolInt canRecode = True;
  539. BoolInt needWriteToStream = needWrite;
  540. if (p->exitThread) return 0; // it's never executed in normal cases
  541. if (p->wasInterrupted)
  542. wasInterrupted = True;
  543. else
  544. {
  545. if (codeRes != SZ_OK) // || !needCode // check it !!!
  546. {
  547. p->wasInterrupted = True;
  548. p->codeRes = codeRes;
  549. if (codeRes == SZ_ERROR_MEM)
  550. isAllocError = True;
  551. }
  552. if (threadingErrorSRes)
  553. {
  554. p->wasInterrupted = True;
  555. p->threadingErrorSRes = threadingErrorSRes;
  556. needWriteToStream = False;
  557. }
  558. if (isAllocError)
  559. {
  560. p->wasInterrupted = True;
  561. p->isAllocError = True;
  562. needWriteToStream = False;
  563. }
  564. if (overflow)
  565. {
  566. p->wasInterrupted = True;
  567. p->overflow = True;
  568. needWriteToStream = False;
  569. }
  570. }
  571. if (needCode)
  572. {
  573. if (wasInterrupted)
  574. {
  575. inCodePos = 0;
  576. outCodePos = 0;
  577. }
  578. {
  579. const UInt64 inDelta = inCodePos - inPrev;
  580. const UInt64 outDelta = outCodePos - outPrev;
  581. // if (inDelta != 0 || outDelta != 0)
  582. res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
  583. }
  584. }
  585. needContinue = (!finish);
  586. // if (res == SZ_OK && needWrite && !wasInterrupted)
  587. if (needWrite)
  588. {
  589. // p->inProcessed += inCodePos;
  590. PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
  591. res = p->mtCallback->Write(p->mtCallbackObject, t->index,
  592. res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
  593. afterEndData, afterEndData_Size, afterEndData_IsCross,
  594. &needContinue,
  595. &canRecode);
  596. // res = SZ_ERROR_FAIL; // for test
  597. PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
  598. PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
  599. if (res != SZ_OK)
  600. {
  601. PRF(printf("\nWrite error = %d\n", res));
  602. isErrorMode = True;
  603. p->wasInterrupted = True;
  604. }
  605. if (res != SZ_OK
  606. || (!needContinue && !finish))
  607. {
  608. PRF(printf("\nWrite Interrupt error = %x\n", res));
  609. MtDec_Interrupt(p, blockIndex);
  610. }
  611. }
  612. if (canRecode)
  613. if (!needCode
  614. || res != SZ_OK
  615. || p->wasInterrupted
  616. || codeRes != SZ_OK
  617. || wasInterrupted
  618. || p->numFilledThreads != 0
  619. || isErrorMode)
  620. {
  621. if (p->numFilledThreads == 0)
  622. p->filledThreadStart = t->index;
  623. if (inDataSize != 0 || !finish)
  624. {
  625. t->inDataSize_Start = inDataSize_Start;
  626. t->inDataSize = inDataSize;
  627. p->numFilledThreads++;
  628. }
  629. PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
  630. PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
  631. }
  632. if (!finish)
  633. {
  634. RINOK_THREAD(Event_Set(&nextThread->canWrite))
  635. }
  636. else
  637. {
  638. if (needContinue)
  639. {
  640. // we restore decoding with new iteration
  641. RINOK_THREAD(Event_Set(&p->threads[0].canWrite))
  642. }
  643. else
  644. {
  645. // we exit from decoding
  646. if (t->index == 0)
  647. return SZ_OK;
  648. p->exitThread = True;
  649. }
  650. RINOK_THREAD(Event_Set(&p->threads[0].canRead))
  651. }
  652. }
  653. }
  654. }
  655. #ifdef _WIN32
  656. #define USE_ALLOCA
  657. #endif
  658. #ifdef USE_ALLOCA
  659. #ifdef _WIN32
  660. #include <malloc.h>
  661. #else
  662. #include <stdlib.h>
  663. #endif
  664. #endif
  665. typedef
  666. #ifdef _WIN32
  667. UINT_PTR
  668. #elif 1
  669. uintptr_t
  670. #else
  671. ptrdiff_t
  672. #endif
  673. MY_uintptr_t;
  674. static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp)
  675. {
  676. WRes res;
  677. CMtDecThread *t = (CMtDecThread *)pp;
  678. CMtDec *p;
  679. // fprintf(stdout, "\n%d = %p\n", t->index, &t);
  680. res = MtDec_ThreadFunc2(t);
  681. p = t->mtDec;
  682. if (res == 0)
  683. return (THREAD_FUNC_RET_TYPE)(MY_uintptr_t)p->exitThreadWRes;
  684. {
  685. // it's unexpected situation for some threading function error
  686. if (p->exitThreadWRes == 0)
  687. p->exitThreadWRes = res;
  688. PRF(printf("\nthread exit error = %d\n", res));
  689. p->exitThread = True;
  690. Event_Set(&p->threads[0].canRead);
  691. Event_Set(&p->threads[0].canWrite);
  692. MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  693. }
  694. return (THREAD_FUNC_RET_TYPE)(MY_uintptr_t)res;
  695. }
  696. static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp)
  697. {
  698. #ifdef USE_ALLOCA
  699. CMtDecThread *t = (CMtDecThread *)pp;
  700. // fprintf(stderr, "\n%d = %p - before", t->index, &t);
  701. t->allocaPtr = alloca(t->index * 128);
  702. #endif
  703. return MtDec_ThreadFunc1(pp);
  704. }
  705. int MtDec_PrepareRead(CMtDec *p)
  706. {
  707. if (p->crossBlock && p->crossStart == p->crossEnd)
  708. {
  709. ISzAlloc_Free(p->alloc, p->crossBlock);
  710. p->crossBlock = NULL;
  711. }
  712. {
  713. unsigned i;
  714. for (i = 0; i < MTDEC_THREADS_MAX; i++)
  715. if (i > p->numStartedThreads
  716. || p->numFilledThreads <=
  717. (i >= p->filledThreadStart ?
  718. i - p->filledThreadStart :
  719. i + p->numStartedThreads - p->filledThreadStart))
  720. MtDecThread_FreeInBufs(&p->threads[i]);
  721. }
  722. return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
  723. }
  724. const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
  725. {
  726. while (p->numFilledThreads != 0)
  727. {
  728. CMtDecThread *t = &p->threads[p->filledThreadStart];
  729. if (*inLim != 0)
  730. {
  731. {
  732. void *link = t->inBuf;
  733. void *next = ((CMtDecBufLink *)link)->next;
  734. ISzAlloc_Free(p->alloc, link);
  735. t->inBuf = next;
  736. }
  737. if (t->inDataSize == 0)
  738. {
  739. MtDecThread_FreeInBufs(t);
  740. if (--p->numFilledThreads == 0)
  741. break;
  742. if (++p->filledThreadStart == p->numStartedThreads)
  743. p->filledThreadStart = 0;
  744. t = &p->threads[p->filledThreadStart];
  745. }
  746. }
  747. {
  748. size_t lim = t->inDataSize_Start;
  749. if (lim != 0)
  750. t->inDataSize_Start = 0;
  751. else
  752. {
  753. UInt64 rem = t->inDataSize;
  754. lim = p->inBufSize;
  755. if (lim > rem)
  756. lim = (size_t)rem;
  757. }
  758. t->inDataSize -= lim;
  759. *inLim = lim;
  760. return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
  761. }
  762. }
  763. {
  764. size_t crossSize = p->crossEnd - p->crossStart;
  765. if (crossSize != 0)
  766. {
  767. const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
  768. *inLim = crossSize;
  769. p->crossStart = 0;
  770. p->crossEnd = 0;
  771. return data;
  772. }
  773. *inLim = 0;
  774. if (p->crossBlock)
  775. {
  776. ISzAlloc_Free(p->alloc, p->crossBlock);
  777. p->crossBlock = NULL;
  778. }
  779. return NULL;
  780. }
  781. }
  782. void MtDec_Construct(CMtDec *p)
  783. {
  784. unsigned i;
  785. p->inBufSize = (size_t)1 << 18;
  786. p->numThreadsMax = 0;
  787. p->inStream = NULL;
  788. // p->inData = NULL;
  789. // p->inDataSize = 0;
  790. p->crossBlock = NULL;
  791. p->crossStart = 0;
  792. p->crossEnd = 0;
  793. p->numFilledThreads = 0;
  794. p->progress = NULL;
  795. p->alloc = NULL;
  796. p->mtCallback = NULL;
  797. p->mtCallbackObject = NULL;
  798. p->allocatedBufsSize = 0;
  799. for (i = 0; i < MTDEC_THREADS_MAX; i++)
  800. {
  801. CMtDecThread *t = &p->threads[i];
  802. t->mtDec = p;
  803. t->index = i;
  804. t->inBuf = NULL;
  805. Event_Construct(&t->canRead);
  806. Event_Construct(&t->canWrite);
  807. Thread_CONSTRUCT(&t->thread)
  808. }
  809. // Event_Construct(&p->finishedEvent);
  810. CriticalSection_Init(&p->mtProgress.cs);
  811. }
  812. static void MtDec_Free(CMtDec *p)
  813. {
  814. unsigned i;
  815. p->exitThread = True;
  816. for (i = 0; i < MTDEC_THREADS_MAX; i++)
  817. MtDecThread_Destruct(&p->threads[i]);
  818. // Event_Close(&p->finishedEvent);
  819. if (p->crossBlock)
  820. {
  821. ISzAlloc_Free(p->alloc, p->crossBlock);
  822. p->crossBlock = NULL;
  823. }
  824. }
  825. void MtDec_Destruct(CMtDec *p)
  826. {
  827. MtDec_Free(p);
  828. CriticalSection_Delete(&p->mtProgress.cs);
  829. }
  830. SRes MtDec_Code(CMtDec *p)
  831. {
  832. unsigned i;
  833. p->inProcessed = 0;
  834. p->blockIndex = 1; // it must be larger than not_defined index (0)
  835. p->isAllocError = False;
  836. p->overflow = False;
  837. p->threadingErrorSRes = SZ_OK;
  838. p->needContinue = True;
  839. p->readWasFinished = False;
  840. p->needInterrupt = False;
  841. p->interruptIndex = (UInt64)(Int64)-1;
  842. p->readProcessed = 0;
  843. p->readRes = SZ_OK;
  844. p->codeRes = SZ_OK;
  845. p->wasInterrupted = False;
  846. p->crossStart = 0;
  847. p->crossEnd = 0;
  848. p->filledThreadStart = 0;
  849. p->numFilledThreads = 0;
  850. {
  851. unsigned numThreads = p->numThreadsMax;
  852. if (numThreads > MTDEC_THREADS_MAX)
  853. numThreads = MTDEC_THREADS_MAX;
  854. p->numStartedThreads_Limit = numThreads;
  855. p->numStartedThreads = 0;
  856. }
  857. if (p->inBufSize != p->allocatedBufsSize)
  858. {
  859. for (i = 0; i < MTDEC_THREADS_MAX; i++)
  860. {
  861. CMtDecThread *t = &p->threads[i];
  862. if (t->inBuf)
  863. MtDecThread_FreeInBufs(t);
  864. }
  865. if (p->crossBlock)
  866. {
  867. ISzAlloc_Free(p->alloc, p->crossBlock);
  868. p->crossBlock = NULL;
  869. }
  870. p->allocatedBufsSize = p->inBufSize;
  871. }
  872. MtProgress_Init(&p->mtProgress, p->progress);
  873. // RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
  874. p->exitThread = False;
  875. p->exitThreadWRes = 0;
  876. {
  877. WRes wres;
  878. SRes sres;
  879. CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
  880. // wres = MtDecThread_CreateAndStart(nextThread);
  881. wres = MtDecThread_CreateEvents(nextThread);
  882. if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
  883. if (wres == 0) { wres = Event_Set(&nextThread->canRead);
  884. if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread);
  885. wres = (WRes)(MY_uintptr_t)res;
  886. if (wres != 0)
  887. {
  888. p->needContinue = False;
  889. MtDec_CloseThreads(p);
  890. }}}}
  891. // wres = 17; // for test
  892. // wres = Event_Wait(&p->finishedEvent);
  893. sres = MY_SRes_HRESULT_FROM_WRes(wres);
  894. if (sres != 0)
  895. p->threadingErrorSRes = sres;
  896. if (
  897. // wres == 0
  898. // wres != 0
  899. // || p->mtc.codeRes == SZ_ERROR_MEM
  900. p->isAllocError
  901. || p->threadingErrorSRes != SZ_OK
  902. || p->overflow)
  903. {
  904. // p->needContinue = True;
  905. }
  906. else
  907. p->needContinue = False;
  908. if (p->needContinue)
  909. return SZ_OK;
  910. // if (sres != SZ_OK)
  911. return sres;
  912. // return SZ_ERROR_FAIL;
  913. }
  914. }
  915. #endif
  916. #undef PRF