Pzstd.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. /*
  2. * Copyright (c) 2016-present, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. */
  9. #include "platform.h" /* Large Files support, SET_BINARY_MODE */
  10. #include "Pzstd.h"
  11. #include "SkippableFrame.h"
  12. #include "utils/FileSystem.h"
  13. #include "utils/Range.h"
  14. #include "utils/ScopeGuard.h"
  15. #include "utils/ThreadPool.h"
  16. #include "utils/WorkQueue.h"
  17. #include <chrono>
  18. #include <cinttypes>
  19. #include <cstddef>
  20. #include <cstdio>
  21. #include <memory>
  22. #include <string>
  23. namespace pzstd {
  24. namespace {
  25. #ifdef _WIN32
  26. const std::string nullOutput = "nul";
  27. #else
  28. const std::string nullOutput = "/dev/null";
  29. #endif
  30. }
  31. using std::size_t;
  32. static std::uintmax_t fileSizeOrZero(const std::string &file) {
  33. if (file == "-") {
  34. return 0;
  35. }
  36. std::error_code ec;
  37. auto size = file_size(file, ec);
  38. if (ec) {
  39. size = 0;
  40. }
  41. return size;
  42. }
  43. static std::uint64_t handleOneInput(const Options &options,
  44. const std::string &inputFile,
  45. FILE* inputFd,
  46. const std::string &outputFile,
  47. FILE* outputFd,
  48. SharedState& state) {
  49. auto inputSize = fileSizeOrZero(inputFile);
  50. // WorkQueue outlives ThreadPool so in the case of error we are certain
  51. // we don't accidentally try to call push() on it after it is destroyed
  52. WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
  53. std::uint64_t bytesRead;
  54. std::uint64_t bytesWritten;
  55. {
  56. // Initialize the (de)compression thread pool with numThreads
  57. ThreadPool executor(options.numThreads);
  58. // Run the reader thread on an extra thread
  59. ThreadPool readExecutor(1);
  60. if (!options.decompress) {
  61. // Add a job that reads the input and starts all the compression jobs
  62. readExecutor.add(
  63. [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
  64. bytesRead = asyncCompressChunks(
  65. state,
  66. outs,
  67. executor,
  68. inputFd,
  69. inputSize,
  70. options.numThreads,
  71. options.determineParameters());
  72. });
  73. // Start writing
  74. bytesWritten = writeFile(state, outs, outputFd, options.decompress);
  75. } else {
  76. // Add a job that reads the input and starts all the decompression jobs
  77. readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
  78. bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
  79. });
  80. // Start writing
  81. bytesWritten = writeFile(state, outs, outputFd, options.decompress);
  82. }
  83. }
  84. if (!state.errorHolder.hasError()) {
  85. std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
  86. std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
  87. if (!options.decompress) {
  88. double ratio = static_cast<double>(bytesWritten) /
  89. static_cast<double>(bytesRead + !bytesRead);
  90. state.log(kLogInfo, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
  91. " bytes, %s)\n",
  92. inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
  93. outputFileName.c_str());
  94. } else {
  95. state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
  96. inputFileName.c_str(),bytesWritten);
  97. }
  98. }
  99. return bytesWritten;
  100. }
  101. static FILE *openInputFile(const std::string &inputFile,
  102. ErrorHolder &errorHolder) {
  103. if (inputFile == "-") {
  104. SET_BINARY_MODE(stdin);
  105. return stdin;
  106. }
  107. // Check if input file is a directory
  108. {
  109. std::error_code ec;
  110. if (is_directory(inputFile, ec)) {
  111. errorHolder.setError("Output file is a directory -- ignored");
  112. return nullptr;
  113. }
  114. }
  115. auto inputFd = std::fopen(inputFile.c_str(), "rb");
  116. if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
  117. return nullptr;
  118. }
  119. return inputFd;
  120. }
  121. static FILE *openOutputFile(const Options &options,
  122. const std::string &outputFile,
  123. SharedState& state) {
  124. if (outputFile == "-") {
  125. SET_BINARY_MODE(stdout);
  126. return stdout;
  127. }
  128. // Check if the output file exists and then open it
  129. if (!options.overwrite && outputFile != nullOutput) {
  130. auto outputFd = std::fopen(outputFile.c_str(), "rb");
  131. if (outputFd != nullptr) {
  132. std::fclose(outputFd);
  133. if (!state.log.logsAt(kLogInfo)) {
  134. state.errorHolder.setError("Output file exists");
  135. return nullptr;
  136. }
  137. state.log(
  138. kLogInfo,
  139. "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
  140. outputFile.c_str());
  141. int c = getchar();
  142. if (c != 'y' && c != 'Y') {
  143. state.errorHolder.setError("Not overwritten");
  144. return nullptr;
  145. }
  146. }
  147. }
  148. auto outputFd = std::fopen(outputFile.c_str(), "wb");
  149. if (!state.errorHolder.check(
  150. outputFd != nullptr, "Failed to open output file")) {
  151. return nullptr;
  152. }
  153. return outputFd;
  154. }
  155. int pzstdMain(const Options &options) {
  156. int returnCode = 0;
  157. SharedState state(options);
  158. for (const auto& input : options.inputFiles) {
  159. // Setup the shared state
  160. auto printErrorGuard = makeScopeGuard([&] {
  161. if (state.errorHolder.hasError()) {
  162. returnCode = 1;
  163. state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
  164. state.errorHolder.getError().c_str());
  165. }
  166. });
  167. // Open the input file
  168. auto inputFd = openInputFile(input, state.errorHolder);
  169. if (inputFd == nullptr) {
  170. continue;
  171. }
  172. auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
  173. // Open the output file
  174. auto outputFile = options.getOutputFile(input);
  175. if (!state.errorHolder.check(outputFile != "",
  176. "Input file does not have extension .zst")) {
  177. continue;
  178. }
  179. auto outputFd = openOutputFile(options, outputFile, state);
  180. if (outputFd == nullptr) {
  181. continue;
  182. }
  183. auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
  184. // (de)compress the file
  185. handleOneInput(options, input, inputFd, outputFile, outputFd, state);
  186. if (state.errorHolder.hasError()) {
  187. continue;
  188. }
  189. // Delete the input file if necessary
  190. if (!options.keepSource) {
  191. // Be sure that we are done and have written everything before we delete
  192. if (!state.errorHolder.check(std::fclose(inputFd) == 0,
  193. "Failed to close input file")) {
  194. continue;
  195. }
  196. closeInputGuard.dismiss();
  197. if (!state.errorHolder.check(std::fclose(outputFd) == 0,
  198. "Failed to close output file")) {
  199. continue;
  200. }
  201. closeOutputGuard.dismiss();
  202. if (std::remove(input.c_str()) != 0) {
  203. state.errorHolder.setError("Failed to remove input file");
  204. continue;
  205. }
  206. }
  207. }
  208. // Returns 1 if any of the files failed to (de)compress.
  209. return returnCode;
  210. }
  211. /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
  212. static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
  213. return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
  214. }
  215. /**
  216. * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
  217. * `inBuffer.pos`.
  218. */
  219. void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
  220. auto pos = inBuffer.pos;
  221. inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
  222. inBuffer.size -= pos;
  223. inBuffer.pos = 0;
  224. return buffer.advance(pos);
  225. }
  226. /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
  227. static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
  228. return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
  229. }
  230. /**
  231. * Split `buffer` and advance `outBuffer` by the amount of data written, as
  232. * indicated by `outBuffer.pos`.
  233. */
  234. Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
  235. auto pos = outBuffer.pos;
  236. outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
  237. outBuffer.size -= pos;
  238. outBuffer.pos = 0;
  239. return buffer.splitAt(pos);
  240. }
  241. /**
  242. * Stream chunks of input from `in`, compress it, and stream it out to `out`.
  243. *
  244. * @param state The shared state
  245. * @param in Queue that we `pop()` input buffers from
  246. * @param out Queue that we `push()` compressed output buffers to
  247. * @param maxInputSize An upper bound on the size of the input
  248. */
  249. static void compress(
  250. SharedState& state,
  251. std::shared_ptr<BufferWorkQueue> in,
  252. std::shared_ptr<BufferWorkQueue> out,
  253. size_t maxInputSize) {
  254. auto& errorHolder = state.errorHolder;
  255. auto guard = makeScopeGuard([&] { out->finish(); });
  256. // Initialize the CCtx
  257. auto ctx = state.cStreamPool->get();
  258. if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
  259. return;
  260. }
  261. {
  262. auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
  263. if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
  264. return;
  265. }
  266. }
  267. // Allocate space for the result
  268. auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
  269. auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
  270. {
  271. Buffer inBuffer;
  272. // Read a buffer in from the input queue
  273. while (in->pop(inBuffer) && !errorHolder.hasError()) {
  274. auto zstdInBuffer = makeZstdInBuffer(inBuffer);
  275. // Compress the whole buffer and send it to the output queue
  276. while (!inBuffer.empty() && !errorHolder.hasError()) {
  277. if (!errorHolder.check(
  278. !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
  279. return;
  280. }
  281. // Compress
  282. auto err =
  283. ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
  284. if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
  285. return;
  286. }
  287. // Split the compressed data off outBuffer and pass to the output queue
  288. out->push(split(outBuffer, zstdOutBuffer));
  289. // Forget about the data we already compressed
  290. advance(inBuffer, zstdInBuffer);
  291. }
  292. }
  293. }
  294. // Write the epilog
  295. size_t bytesLeft;
  296. do {
  297. if (!errorHolder.check(
  298. !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
  299. return;
  300. }
  301. bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
  302. if (!errorHolder.check(
  303. !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
  304. return;
  305. }
  306. out->push(split(outBuffer, zstdOutBuffer));
  307. } while (bytesLeft != 0 && !errorHolder.hasError());
  308. }
  309. /**
  310. * Calculates how large each independently compressed frame should be.
  311. *
  312. * @param size The size of the source if known, 0 otherwise
  313. * @param numThreads The number of threads available to run compression jobs on
  314. * @param params The zstd parameters to be used for compression
  315. */
  316. static size_t calculateStep(
  317. std::uintmax_t size,
  318. size_t numThreads,
  319. const ZSTD_parameters &params) {
  320. (void)size;
  321. (void)numThreads;
  322. return size_t{1} << (params.cParams.windowLog + 2);
  323. }
  324. namespace {
  325. enum class FileStatus { Continue, Done, Error };
  326. /// Determines the status of the file descriptor `fd`.
  327. FileStatus fileStatus(FILE* fd) {
  328. if (std::feof(fd)) {
  329. return FileStatus::Done;
  330. } else if (std::ferror(fd)) {
  331. return FileStatus::Error;
  332. }
  333. return FileStatus::Continue;
  334. }
  335. } // anonymous namespace
  336. /**
  337. * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
  338. * Will read less if an error or EOF occurs.
  339. * Returns the status of the file after all of the reads have occurred.
  340. */
  341. static FileStatus
  342. readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
  343. std::uint64_t *totalBytesRead) {
  344. Buffer buffer(size);
  345. while (!buffer.empty()) {
  346. auto bytesRead =
  347. std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
  348. *totalBytesRead += bytesRead;
  349. queue.push(buffer.splitAt(bytesRead));
  350. auto status = fileStatus(fd);
  351. if (status != FileStatus::Continue) {
  352. return status;
  353. }
  354. }
  355. return FileStatus::Continue;
  356. }
  357. std::uint64_t asyncCompressChunks(
  358. SharedState& state,
  359. WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
  360. ThreadPool& executor,
  361. FILE* fd,
  362. std::uintmax_t size,
  363. size_t numThreads,
  364. ZSTD_parameters params) {
  365. auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
  366. std::uint64_t bytesRead = 0;
  367. // Break the input up into chunks of size `step` and compress each chunk
  368. // independently.
  369. size_t step = calculateStep(size, numThreads, params);
  370. state.log(kLogDebug, "Chosen frame size: %zu\n", step);
  371. auto status = FileStatus::Continue;
  372. while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
  373. // Make a new input queue that we will put the chunk's input data into.
  374. auto in = std::make_shared<BufferWorkQueue>();
  375. auto inGuard = makeScopeGuard([&] { in->finish(); });
  376. // Make a new output queue that compress will put the compressed data into.
  377. auto out = std::make_shared<BufferWorkQueue>();
  378. // Start compression in the thread pool
  379. executor.add([&state, in, out, step] {
  380. return compress(
  381. state, std::move(in), std::move(out), step);
  382. });
  383. // Pass the output queue to the writer thread.
  384. chunks.push(std::move(out));
  385. state.log(kLogVerbose, "%s\n", "Starting a new frame");
  386. // Fill the input queue for the compression job we just started
  387. status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
  388. }
  389. state.errorHolder.check(status != FileStatus::Error, "Error reading input");
  390. return bytesRead;
  391. }
  392. /**
  393. * Decompress a frame, whose data is streamed into `in`, and stream the output
  394. * to `out`.
  395. *
  396. * @param state The shared state
  397. * @param in Queue that we `pop()` input buffers from. It contains
  398. * exactly one compressed frame.
  399. * @param out Queue that we `push()` decompressed output buffers to
  400. */
  401. static void decompress(
  402. SharedState& state,
  403. std::shared_ptr<BufferWorkQueue> in,
  404. std::shared_ptr<BufferWorkQueue> out) {
  405. auto& errorHolder = state.errorHolder;
  406. auto guard = makeScopeGuard([&] { out->finish(); });
  407. // Initialize the DCtx
  408. auto ctx = state.dStreamPool->get();
  409. if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
  410. return;
  411. }
  412. {
  413. auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
  414. if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
  415. return;
  416. }
  417. }
  418. const size_t outSize = ZSTD_DStreamOutSize();
  419. Buffer inBuffer;
  420. size_t returnCode = 0;
  421. // Read a buffer in from the input queue
  422. while (in->pop(inBuffer) && !errorHolder.hasError()) {
  423. auto zstdInBuffer = makeZstdInBuffer(inBuffer);
  424. // Decompress the whole buffer and send it to the output queue
  425. while (!inBuffer.empty() && !errorHolder.hasError()) {
  426. // Allocate a buffer with at least outSize bytes.
  427. Buffer outBuffer(outSize);
  428. auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
  429. // Decompress
  430. returnCode =
  431. ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
  432. if (!errorHolder.check(
  433. !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
  434. return;
  435. }
  436. // Pass the buffer with the decompressed data to the output queue
  437. out->push(split(outBuffer, zstdOutBuffer));
  438. // Advance past the input we already read
  439. advance(inBuffer, zstdInBuffer);
  440. if (returnCode == 0) {
  441. // The frame is over, prepare to (maybe) start a new frame
  442. ZSTD_initDStream(ctx.get());
  443. }
  444. }
  445. }
  446. if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
  447. return;
  448. }
  449. // We've given ZSTD_decompressStream all of our data, but there may still
  450. // be data to read.
  451. while (returnCode == 1) {
  452. // Allocate a buffer with at least outSize bytes.
  453. Buffer outBuffer(outSize);
  454. auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
  455. // Pass in no input.
  456. ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
  457. // Decompress
  458. returnCode =
  459. ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
  460. if (!errorHolder.check(
  461. !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
  462. return;
  463. }
  464. // Pass the buffer with the decompressed data to the output queue
  465. out->push(split(outBuffer, zstdOutBuffer));
  466. }
  467. }
  468. std::uint64_t asyncDecompressFrames(
  469. SharedState& state,
  470. WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
  471. ThreadPool& executor,
  472. FILE* fd) {
  473. auto framesGuard = makeScopeGuard([&] { frames.finish(); });
  474. std::uint64_t totalBytesRead = 0;
  475. // Split the source up into its component frames.
  476. // If we find our recognized skippable frame we know the next frames size
  477. // which means that we can decompress each standard frame in independently.
  478. // Otherwise, we will decompress using only one decompression task.
  479. const size_t chunkSize = ZSTD_DStreamInSize();
  480. auto status = FileStatus::Continue;
  481. while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
  482. // Make a new input queue that we will put the frames's bytes into.
  483. auto in = std::make_shared<BufferWorkQueue>();
  484. auto inGuard = makeScopeGuard([&] { in->finish(); });
  485. // Make a output queue that decompress will put the decompressed data into
  486. auto out = std::make_shared<BufferWorkQueue>();
  487. size_t frameSize;
  488. {
  489. // Calculate the size of the next frame.
  490. // frameSize is 0 if the frame info can't be decoded.
  491. Buffer buffer(SkippableFrame::kSize);
  492. auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
  493. totalBytesRead += bytesRead;
  494. status = fileStatus(fd);
  495. if (bytesRead == 0 && status != FileStatus::Continue) {
  496. break;
  497. }
  498. buffer.subtract(buffer.size() - bytesRead);
  499. frameSize = SkippableFrame::tryRead(buffer.range());
  500. in->push(std::move(buffer));
  501. }
  502. if (frameSize == 0) {
  503. // We hit a non SkippableFrame, so this will be the last job.
  504. // Make sure that we don't use too much memory
  505. in->setMaxSize(64);
  506. out->setMaxSize(64);
  507. }
  508. // Start decompression in the thread pool
  509. executor.add([&state, in, out] {
  510. return decompress(state, std::move(in), std::move(out));
  511. });
  512. // Pass the output queue to the writer thread
  513. frames.push(std::move(out));
  514. if (frameSize == 0) {
  515. // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
  516. // Pass the rest of the source to this decompression task
  517. state.log(kLogVerbose, "%s\n",
  518. "Input not in pzstd format, falling back to serial decompression");
  519. while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
  520. status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
  521. }
  522. break;
  523. }
  524. state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
  525. // Fill the input queue for the decompression job we just started
  526. status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
  527. }
  528. state.errorHolder.check(status != FileStatus::Error, "Error reading input");
  529. return totalBytesRead;
  530. }
  531. /// Write `data` to `fd`, returns true iff success.
  532. static bool writeData(ByteRange data, FILE* fd) {
  533. while (!data.empty()) {
  534. data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
  535. if (std::ferror(fd)) {
  536. return false;
  537. }
  538. }
  539. return true;
  540. }
  541. std::uint64_t writeFile(
  542. SharedState& state,
  543. WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
  544. FILE* outputFd,
  545. bool decompress) {
  546. auto& errorHolder = state.errorHolder;
  547. auto lineClearGuard = makeScopeGuard([&state] {
  548. state.log.clear(kLogInfo);
  549. });
  550. std::uint64_t bytesWritten = 0;
  551. std::shared_ptr<BufferWorkQueue> out;
  552. // Grab the output queue for each decompression job (in order).
  553. while (outs.pop(out)) {
  554. if (errorHolder.hasError()) {
  555. continue;
  556. }
  557. if (!decompress) {
  558. // If we are compressing and want to write skippable frames we can't
  559. // start writing before compression is done because we need to know the
  560. // compressed size.
  561. // Wait for the compressed size to be available and write skippable frame
  562. SkippableFrame frame(out->size());
  563. if (!writeData(frame.data(), outputFd)) {
  564. errorHolder.setError("Failed to write output");
  565. return bytesWritten;
  566. }
  567. bytesWritten += frame.kSize;
  568. }
  569. // For each chunk of the frame: Pop it from the queue and write it
  570. Buffer buffer;
  571. while (out->pop(buffer) && !errorHolder.hasError()) {
  572. if (!writeData(buffer.range(), outputFd)) {
  573. errorHolder.setError("Failed to write output");
  574. return bytesWritten;
  575. }
  576. bytesWritten += buffer.size();
  577. state.log.update(kLogInfo, "Written: %u MB ",
  578. static_cast<std::uint32_t>(bytesWritten >> 20));
  579. }
  580. }
  581. return bytesWritten;
  582. }
  583. }