123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611 |
- /*
- * Copyright (c) 2016-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed under both the BSD-style license (found in the
- * LICENSE file in the root directory of this source tree) and the GPLv2 (found
- * in the COPYING file in the root directory of this source tree).
- */
- #include "platform.h" /* Large Files support, SET_BINARY_MODE */
- #include "Pzstd.h"
- #include "SkippableFrame.h"
- #include "utils/FileSystem.h"
- #include "utils/Range.h"
- #include "utils/ScopeGuard.h"
- #include "utils/ThreadPool.h"
- #include "utils/WorkQueue.h"
- #include <chrono>
- #include <cinttypes>
- #include <cstddef>
- #include <cstdio>
- #include <memory>
- #include <string>
- namespace pzstd {
- namespace {
- #ifdef _WIN32
- const std::string nullOutput = "nul";
- #else
- const std::string nullOutput = "/dev/null";
- #endif
- }
- using std::size_t;
- static std::uintmax_t fileSizeOrZero(const std::string &file) {
- if (file == "-") {
- return 0;
- }
- std::error_code ec;
- auto size = file_size(file, ec);
- if (ec) {
- size = 0;
- }
- return size;
- }
- static std::uint64_t handleOneInput(const Options &options,
- const std::string &inputFile,
- FILE* inputFd,
- const std::string &outputFile,
- FILE* outputFd,
- SharedState& state) {
- auto inputSize = fileSizeOrZero(inputFile);
- // WorkQueue outlives ThreadPool so in the case of error we are certain
- // we don't accidentally try to call push() on it after it is destroyed
- WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
- std::uint64_t bytesRead;
- std::uint64_t bytesWritten;
- {
- // Initialize the (de)compression thread pool with numThreads
- ThreadPool executor(options.numThreads);
- // Run the reader thread on an extra thread
- ThreadPool readExecutor(1);
- if (!options.decompress) {
- // Add a job that reads the input and starts all the compression jobs
- readExecutor.add(
- [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
- bytesRead = asyncCompressChunks(
- state,
- outs,
- executor,
- inputFd,
- inputSize,
- options.numThreads,
- options.determineParameters());
- });
- // Start writing
- bytesWritten = writeFile(state, outs, outputFd, options.decompress);
- } else {
- // Add a job that reads the input and starts all the decompression jobs
- readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
- bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
- });
- // Start writing
- bytesWritten = writeFile(state, outs, outputFd, options.decompress);
- }
- }
- if (!state.errorHolder.hasError()) {
- std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
- std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
- if (!options.decompress) {
- double ratio = static_cast<double>(bytesWritten) /
- static_cast<double>(bytesRead + !bytesRead);
- state.log(kLogInfo, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
- " bytes, %s)\n",
- inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
- outputFileName.c_str());
- } else {
- state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
- inputFileName.c_str(),bytesWritten);
- }
- }
- return bytesWritten;
- }
- static FILE *openInputFile(const std::string &inputFile,
- ErrorHolder &errorHolder) {
- if (inputFile == "-") {
- SET_BINARY_MODE(stdin);
- return stdin;
- }
- // Check if input file is a directory
- {
- std::error_code ec;
- if (is_directory(inputFile, ec)) {
- errorHolder.setError("Output file is a directory -- ignored");
- return nullptr;
- }
- }
- auto inputFd = std::fopen(inputFile.c_str(), "rb");
- if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
- return nullptr;
- }
- return inputFd;
- }
- static FILE *openOutputFile(const Options &options,
- const std::string &outputFile,
- SharedState& state) {
- if (outputFile == "-") {
- SET_BINARY_MODE(stdout);
- return stdout;
- }
- // Check if the output file exists and then open it
- if (!options.overwrite && outputFile != nullOutput) {
- auto outputFd = std::fopen(outputFile.c_str(), "rb");
- if (outputFd != nullptr) {
- std::fclose(outputFd);
- if (!state.log.logsAt(kLogInfo)) {
- state.errorHolder.setError("Output file exists");
- return nullptr;
- }
- state.log(
- kLogInfo,
- "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
- outputFile.c_str());
- int c = getchar();
- if (c != 'y' && c != 'Y') {
- state.errorHolder.setError("Not overwritten");
- return nullptr;
- }
- }
- }
- auto outputFd = std::fopen(outputFile.c_str(), "wb");
- if (!state.errorHolder.check(
- outputFd != nullptr, "Failed to open output file")) {
- return nullptr;
- }
- return outputFd;
- }
- int pzstdMain(const Options &options) {
- int returnCode = 0;
- SharedState state(options);
- for (const auto& input : options.inputFiles) {
- // Setup the shared state
- auto printErrorGuard = makeScopeGuard([&] {
- if (state.errorHolder.hasError()) {
- returnCode = 1;
- state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
- state.errorHolder.getError().c_str());
- }
- });
- // Open the input file
- auto inputFd = openInputFile(input, state.errorHolder);
- if (inputFd == nullptr) {
- continue;
- }
- auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
- // Open the output file
- auto outputFile = options.getOutputFile(input);
- if (!state.errorHolder.check(outputFile != "",
- "Input file does not have extension .zst")) {
- continue;
- }
- auto outputFd = openOutputFile(options, outputFile, state);
- if (outputFd == nullptr) {
- continue;
- }
- auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
- // (de)compress the file
- handleOneInput(options, input, inputFd, outputFile, outputFd, state);
- if (state.errorHolder.hasError()) {
- continue;
- }
- // Delete the input file if necessary
- if (!options.keepSource) {
- // Be sure that we are done and have written everything before we delete
- if (!state.errorHolder.check(std::fclose(inputFd) == 0,
- "Failed to close input file")) {
- continue;
- }
- closeInputGuard.dismiss();
- if (!state.errorHolder.check(std::fclose(outputFd) == 0,
- "Failed to close output file")) {
- continue;
- }
- closeOutputGuard.dismiss();
- if (std::remove(input.c_str()) != 0) {
- state.errorHolder.setError("Failed to remove input file");
- continue;
- }
- }
- }
- // Returns 1 if any of the files failed to (de)compress.
- return returnCode;
- }
- /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
- static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
- return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
- }
- /**
- * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
- * `inBuffer.pos`.
- */
- void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
- auto pos = inBuffer.pos;
- inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
- inBuffer.size -= pos;
- inBuffer.pos = 0;
- return buffer.advance(pos);
- }
- /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
- static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
- return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
- }
- /**
- * Split `buffer` and advance `outBuffer` by the amount of data written, as
- * indicated by `outBuffer.pos`.
- */
- Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
- auto pos = outBuffer.pos;
- outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
- outBuffer.size -= pos;
- outBuffer.pos = 0;
- return buffer.splitAt(pos);
- }
- /**
- * Stream chunks of input from `in`, compress it, and stream it out to `out`.
- *
- * @param state The shared state
- * @param in Queue that we `pop()` input buffers from
- * @param out Queue that we `push()` compressed output buffers to
- * @param maxInputSize An upper bound on the size of the input
- */
- static void compress(
- SharedState& state,
- std::shared_ptr<BufferWorkQueue> in,
- std::shared_ptr<BufferWorkQueue> out,
- size_t maxInputSize) {
- auto& errorHolder = state.errorHolder;
- auto guard = makeScopeGuard([&] { out->finish(); });
- // Initialize the CCtx
- auto ctx = state.cStreamPool->get();
- if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
- return;
- }
- {
- auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
- if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
- return;
- }
- }
- // Allocate space for the result
- auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
- auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
- {
- Buffer inBuffer;
- // Read a buffer in from the input queue
- while (in->pop(inBuffer) && !errorHolder.hasError()) {
- auto zstdInBuffer = makeZstdInBuffer(inBuffer);
- // Compress the whole buffer and send it to the output queue
- while (!inBuffer.empty() && !errorHolder.hasError()) {
- if (!errorHolder.check(
- !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
- return;
- }
- // Compress
- auto err =
- ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
- if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
- return;
- }
- // Split the compressed data off outBuffer and pass to the output queue
- out->push(split(outBuffer, zstdOutBuffer));
- // Forget about the data we already compressed
- advance(inBuffer, zstdInBuffer);
- }
- }
- }
- // Write the epilog
- size_t bytesLeft;
- do {
- if (!errorHolder.check(
- !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
- return;
- }
- bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
- if (!errorHolder.check(
- !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
- return;
- }
- out->push(split(outBuffer, zstdOutBuffer));
- } while (bytesLeft != 0 && !errorHolder.hasError());
- }
- /**
- * Calculates how large each independently compressed frame should be.
- *
- * @param size The size of the source if known, 0 otherwise
- * @param numThreads The number of threads available to run compression jobs on
- * @param params The zstd parameters to be used for compression
- */
- static size_t calculateStep(
- std::uintmax_t size,
- size_t numThreads,
- const ZSTD_parameters ¶ms) {
- (void)size;
- (void)numThreads;
- return size_t{1} << (params.cParams.windowLog + 2);
- }
- namespace {
- enum class FileStatus { Continue, Done, Error };
- /// Determines the status of the file descriptor `fd`.
- FileStatus fileStatus(FILE* fd) {
- if (std::feof(fd)) {
- return FileStatus::Done;
- } else if (std::ferror(fd)) {
- return FileStatus::Error;
- }
- return FileStatus::Continue;
- }
- } // anonymous namespace
- /**
- * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
- * Will read less if an error or EOF occurs.
- * Returns the status of the file after all of the reads have occurred.
- */
- static FileStatus
- readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
- std::uint64_t *totalBytesRead) {
- Buffer buffer(size);
- while (!buffer.empty()) {
- auto bytesRead =
- std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
- *totalBytesRead += bytesRead;
- queue.push(buffer.splitAt(bytesRead));
- auto status = fileStatus(fd);
- if (status != FileStatus::Continue) {
- return status;
- }
- }
- return FileStatus::Continue;
- }
- std::uint64_t asyncCompressChunks(
- SharedState& state,
- WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
- ThreadPool& executor,
- FILE* fd,
- std::uintmax_t size,
- size_t numThreads,
- ZSTD_parameters params) {
- auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
- std::uint64_t bytesRead = 0;
- // Break the input up into chunks of size `step` and compress each chunk
- // independently.
- size_t step = calculateStep(size, numThreads, params);
- state.log(kLogDebug, "Chosen frame size: %zu\n", step);
- auto status = FileStatus::Continue;
- while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
- // Make a new input queue that we will put the chunk's input data into.
- auto in = std::make_shared<BufferWorkQueue>();
- auto inGuard = makeScopeGuard([&] { in->finish(); });
- // Make a new output queue that compress will put the compressed data into.
- auto out = std::make_shared<BufferWorkQueue>();
- // Start compression in the thread pool
- executor.add([&state, in, out, step] {
- return compress(
- state, std::move(in), std::move(out), step);
- });
- // Pass the output queue to the writer thread.
- chunks.push(std::move(out));
- state.log(kLogVerbose, "%s\n", "Starting a new frame");
- // Fill the input queue for the compression job we just started
- status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
- }
- state.errorHolder.check(status != FileStatus::Error, "Error reading input");
- return bytesRead;
- }
- /**
- * Decompress a frame, whose data is streamed into `in`, and stream the output
- * to `out`.
- *
- * @param state The shared state
- * @param in Queue that we `pop()` input buffers from. It contains
- * exactly one compressed frame.
- * @param out Queue that we `push()` decompressed output buffers to
- */
- static void decompress(
- SharedState& state,
- std::shared_ptr<BufferWorkQueue> in,
- std::shared_ptr<BufferWorkQueue> out) {
- auto& errorHolder = state.errorHolder;
- auto guard = makeScopeGuard([&] { out->finish(); });
- // Initialize the DCtx
- auto ctx = state.dStreamPool->get();
- if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
- return;
- }
- {
- auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
- if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
- return;
- }
- }
- const size_t outSize = ZSTD_DStreamOutSize();
- Buffer inBuffer;
- size_t returnCode = 0;
- // Read a buffer in from the input queue
- while (in->pop(inBuffer) && !errorHolder.hasError()) {
- auto zstdInBuffer = makeZstdInBuffer(inBuffer);
- // Decompress the whole buffer and send it to the output queue
- while (!inBuffer.empty() && !errorHolder.hasError()) {
- // Allocate a buffer with at least outSize bytes.
- Buffer outBuffer(outSize);
- auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
- // Decompress
- returnCode =
- ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
- if (!errorHolder.check(
- !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
- return;
- }
- // Pass the buffer with the decompressed data to the output queue
- out->push(split(outBuffer, zstdOutBuffer));
- // Advance past the input we already read
- advance(inBuffer, zstdInBuffer);
- if (returnCode == 0) {
- // The frame is over, prepare to (maybe) start a new frame
- ZSTD_initDStream(ctx.get());
- }
- }
- }
- if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
- return;
- }
- // We've given ZSTD_decompressStream all of our data, but there may still
- // be data to read.
- while (returnCode == 1) {
- // Allocate a buffer with at least outSize bytes.
- Buffer outBuffer(outSize);
- auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
- // Pass in no input.
- ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
- // Decompress
- returnCode =
- ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
- if (!errorHolder.check(
- !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
- return;
- }
- // Pass the buffer with the decompressed data to the output queue
- out->push(split(outBuffer, zstdOutBuffer));
- }
- }
- std::uint64_t asyncDecompressFrames(
- SharedState& state,
- WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
- ThreadPool& executor,
- FILE* fd) {
- auto framesGuard = makeScopeGuard([&] { frames.finish(); });
- std::uint64_t totalBytesRead = 0;
- // Split the source up into its component frames.
- // If we find our recognized skippable frame we know the next frames size
- // which means that we can decompress each standard frame in independently.
- // Otherwise, we will decompress using only one decompression task.
- const size_t chunkSize = ZSTD_DStreamInSize();
- auto status = FileStatus::Continue;
- while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
- // Make a new input queue that we will put the frames's bytes into.
- auto in = std::make_shared<BufferWorkQueue>();
- auto inGuard = makeScopeGuard([&] { in->finish(); });
- // Make a output queue that decompress will put the decompressed data into
- auto out = std::make_shared<BufferWorkQueue>();
- size_t frameSize;
- {
- // Calculate the size of the next frame.
- // frameSize is 0 if the frame info can't be decoded.
- Buffer buffer(SkippableFrame::kSize);
- auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
- totalBytesRead += bytesRead;
- status = fileStatus(fd);
- if (bytesRead == 0 && status != FileStatus::Continue) {
- break;
- }
- buffer.subtract(buffer.size() - bytesRead);
- frameSize = SkippableFrame::tryRead(buffer.range());
- in->push(std::move(buffer));
- }
- if (frameSize == 0) {
- // We hit a non SkippableFrame, so this will be the last job.
- // Make sure that we don't use too much memory
- in->setMaxSize(64);
- out->setMaxSize(64);
- }
- // Start decompression in the thread pool
- executor.add([&state, in, out] {
- return decompress(state, std::move(in), std::move(out));
- });
- // Pass the output queue to the writer thread
- frames.push(std::move(out));
- if (frameSize == 0) {
- // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
- // Pass the rest of the source to this decompression task
- state.log(kLogVerbose, "%s\n",
- "Input not in pzstd format, falling back to serial decompression");
- while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
- status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
- }
- break;
- }
- state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
- // Fill the input queue for the decompression job we just started
- status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
- }
- state.errorHolder.check(status != FileStatus::Error, "Error reading input");
- return totalBytesRead;
- }
- /// Write `data` to `fd`, returns true iff success.
- static bool writeData(ByteRange data, FILE* fd) {
- while (!data.empty()) {
- data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
- if (std::ferror(fd)) {
- return false;
- }
- }
- return true;
- }
- std::uint64_t writeFile(
- SharedState& state,
- WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
- FILE* outputFd,
- bool decompress) {
- auto& errorHolder = state.errorHolder;
- auto lineClearGuard = makeScopeGuard([&state] {
- state.log.clear(kLogInfo);
- });
- std::uint64_t bytesWritten = 0;
- std::shared_ptr<BufferWorkQueue> out;
- // Grab the output queue for each decompression job (in order).
- while (outs.pop(out)) {
- if (errorHolder.hasError()) {
- continue;
- }
- if (!decompress) {
- // If we are compressing and want to write skippable frames we can't
- // start writing before compression is done because we need to know the
- // compressed size.
- // Wait for the compressed size to be available and write skippable frame
- SkippableFrame frame(out->size());
- if (!writeData(frame.data(), outputFd)) {
- errorHolder.setError("Failed to write output");
- return bytesWritten;
- }
- bytesWritten += frame.kSize;
- }
- // For each chunk of the frame: Pop it from the queue and write it
- Buffer buffer;
- while (out->pop(buffer) && !errorHolder.hasError()) {
- if (!writeData(buffer.range(), outputFd)) {
- errorHolder.setError("Failed to write output");
- return bytesWritten;
- }
- bytesWritten += buffer.size();
- state.log.update(kLogInfo, "Written: %u MB ",
- static_cast<std::uint32_t>(bytesWritten >> 20));
- }
- }
- return bytesWritten;
- }
- }
|