Pzstd.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /*
  2. * Copyright (c) 2016-present, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. */
  9. #pragma once
  10. #include "ErrorHolder.h"
  11. #include "Logging.h"
  12. #include "Options.h"
  13. #include "utils/Buffer.h"
  14. #include "utils/Range.h"
  15. #include "utils/ResourcePool.h"
  16. #include "utils/ThreadPool.h"
  17. #include "utils/WorkQueue.h"
  18. #define ZSTD_STATIC_LINKING_ONLY
  19. #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
  20. * and uses deprecated functions
  21. */
  22. #include "zstd.h"
  23. #undef ZSTD_STATIC_LINKING_ONLY
  24. #include <cstddef>
  25. #include <cstdint>
  26. #include <memory>
  27. namespace pzstd {
  28. /**
  29. * Runs pzstd with `options` and returns the number of bytes written.
  30. * An error occurred if `errorHandler.hasError()`.
  31. *
  32. * @param options The pzstd options to use for (de)compression
  33. * @returns 0 upon success and non-zero on failure.
  34. */
  35. int pzstdMain(const Options& options);
  36. class SharedState {
  37. public:
  38. SharedState(const Options& options) : log(options.verbosity) {
  39. if (!options.decompress) {
  40. auto parameters = options.determineParameters();
  41. cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
  42. [this, parameters]() -> ZSTD_CStream* {
  43. this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
  44. auto zcs = ZSTD_createCStream();
  45. if (zcs) {
  46. auto err = ZSTD_initCStream_advanced(
  47. zcs, nullptr, 0, parameters, 0);
  48. if (ZSTD_isError(err)) {
  49. ZSTD_freeCStream(zcs);
  50. return nullptr;
  51. }
  52. }
  53. return zcs;
  54. },
  55. [](ZSTD_CStream *zcs) {
  56. ZSTD_freeCStream(zcs);
  57. }});
  58. } else {
  59. dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
  60. [this]() -> ZSTD_DStream* {
  61. this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
  62. auto zds = ZSTD_createDStream();
  63. if (zds) {
  64. auto err = ZSTD_initDStream(zds);
  65. if (ZSTD_isError(err)) {
  66. ZSTD_freeDStream(zds);
  67. return nullptr;
  68. }
  69. }
  70. return zds;
  71. },
  72. [](ZSTD_DStream *zds) {
  73. ZSTD_freeDStream(zds);
  74. }});
  75. }
  76. }
  77. ~SharedState() {
  78. // The resource pools have references to this, so destroy them first.
  79. cStreamPool.reset();
  80. dStreamPool.reset();
  81. }
  82. Logger log;
  83. ErrorHolder errorHolder;
  84. std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
  85. std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
  86. };
  87. /**
  88. * Streams input from `fd`, breaks input up into chunks, and compresses each
  89. * chunk independently. Output of each chunk gets streamed to a queue, and
  90. * the output queues get put into `chunks` in order.
  91. *
  92. * @param state The shared state
  93. * @param chunks Each compression jobs output queue gets `pushed()` here
  94. * as soon as it is available
  95. * @param executor The thread pool to run compression jobs in
  96. * @param fd The input file descriptor
  97. * @param size The size of the input file if known, 0 otherwise
  98. * @param numThreads The number of threads in the thread pool
  99. * @param parameters The zstd parameters to use for compression
  100. * @returns The number of bytes read from the file
  101. */
  102. std::uint64_t asyncCompressChunks(
  103. SharedState& state,
  104. WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
  105. ThreadPool& executor,
  106. FILE* fd,
  107. std::uintmax_t size,
  108. std::size_t numThreads,
  109. ZSTD_parameters parameters);
  110. /**
  111. * Streams input from `fd`. If pzstd headers are available it breaks the input
  112. * up into independent frames. It sends each frame to an independent
  113. * decompression job. Output of each frame gets streamed to a queue, and
  114. * the output queues get put into `frames` in order.
  115. *
  116. * @param state The shared state
  117. * @param frames Each decompression jobs output queue gets `pushed()` here
  118. * as soon as it is available
  119. * @param executor The thread pool to run compression jobs in
  120. * @param fd The input file descriptor
  121. * @returns The number of bytes read from the file
  122. */
  123. std::uint64_t asyncDecompressFrames(
  124. SharedState& state,
  125. WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
  126. ThreadPool& executor,
  127. FILE* fd);
  128. /**
  129. * Streams input in from each queue in `outs` in order, and writes the data to
  130. * `outputFd`.
  131. *
  132. * @param state The shared state
  133. * @param outs A queue of output queues, one for each
  134. * (de)compression job.
  135. * @param outputFd The file descriptor to write to
  136. * @param decompress Are we decompressing?
  137. * @returns The number of bytes written
  138. */
  139. std::uint64_t writeFile(
  140. SharedState& state,
  141. WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
  142. FILE* outputFd,
  143. bool decompress);
  144. }