parallel_compression.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /*
  2. * Copyright (c) 2017-present, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. */
  9. #include <stdlib.h> // malloc, free, exit, atoi
  10. #include <stdio.h> // fprintf, perror, feof, fopen, etc.
  11. #include <string.h> // strlen, memset, strcat
  12. #define ZSTD_STATIC_LINKING_ONLY
  13. #include <zstd.h> // presumes zstd library is installed
  14. #include <zstd_errors.h>
  15. #if defined(WIN32) || defined(_WIN32)
  16. # include <windows.h>
  17. # define SLEEP(x) Sleep(x)
  18. #else
  19. # include <unistd.h>
  20. # define SLEEP(x) usleep(x * 1000)
  21. #endif
  22. #include "xxhash.h"
  23. #include "pool.h" // use zstd thread pool for demo
  24. #include "zstd_seekable.h"
  25. static void* malloc_orDie(size_t size)
  26. {
  27. void* const buff = malloc(size);
  28. if (buff) return buff;
  29. /* error */
  30. perror("malloc:");
  31. exit(1);
  32. }
  33. static FILE* fopen_orDie(const char *filename, const char *instruction)
  34. {
  35. FILE* const inFile = fopen(filename, instruction);
  36. if (inFile) return inFile;
  37. /* error */
  38. perror(filename);
  39. exit(3);
  40. }
  41. static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file)
  42. {
  43. size_t const readSize = fread(buffer, 1, sizeToRead, file);
  44. if (readSize == sizeToRead) return readSize; /* good */
  45. if (feof(file)) return readSize; /* good, reached end of file */
  46. /* error */
  47. perror("fread");
  48. exit(4);
  49. }
  50. static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file)
  51. {
  52. size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file);
  53. if (writtenSize == sizeToWrite) return sizeToWrite; /* good */
  54. /* error */
  55. perror("fwrite");
  56. exit(5);
  57. }
  58. static size_t fclose_orDie(FILE* file)
  59. {
  60. if (!fclose(file)) return 0;
  61. /* error */
  62. perror("fclose");
  63. exit(6);
  64. }
  65. static void fseek_orDie(FILE* file, long int offset, int origin)
  66. {
  67. if (!fseek(file, offset, origin)) {
  68. if (!fflush(file)) return;
  69. }
  70. /* error */
  71. perror("fseek");
  72. exit(7);
  73. }
  74. static long int ftell_orDie(FILE* file)
  75. {
  76. long int off = ftell(file);
  77. if (off != -1) return off;
  78. /* error */
  79. perror("ftell");
  80. exit(8);
  81. }
  82. struct job {
  83. const void* src;
  84. size_t srcSize;
  85. void* dst;
  86. size_t dstSize;
  87. unsigned checksum;
  88. int compressionLevel;
  89. int done;
  90. };
  91. static void compressFrame(void* opaque)
  92. {
  93. struct job* job = opaque;
  94. job->checksum = XXH64(job->src, job->srcSize, 0);
  95. size_t ret = ZSTD_compress(job->dst, job->dstSize, job->src, job->srcSize, job->compressionLevel);
  96. if (ZSTD_isError(ret)) {
  97. fprintf(stderr, "ZSTD_compress() error : %s \n", ZSTD_getErrorName(ret));
  98. exit(20);
  99. }
  100. job->dstSize = ret;
  101. job->done = 1;
  102. }
  103. static void compressFile_orDie(const char* fname, const char* outName, int cLevel, unsigned frameSize, int nbThreads)
  104. {
  105. POOL_ctx* pool = POOL_create(nbThreads, nbThreads);
  106. if (pool == NULL) { fprintf(stderr, "POOL_create() error \n"); exit(9); }
  107. FILE* const fin = fopen_orDie(fname, "rb");
  108. FILE* const fout = fopen_orDie(outName, "wb");
  109. if (ZSTD_compressBound(frameSize) > 0xFFFFFFFFU) { fprintf(stderr, "Frame size too large \n"); exit(10); }
  110. unsigned dstSize = ZSTD_compressBound(frameSize);
  111. fseek_orDie(fin, 0, SEEK_END);
  112. long int length = ftell_orDie(fin);
  113. fseek_orDie(fin, 0, SEEK_SET);
  114. size_t numFrames = (length + frameSize - 1) / frameSize;
  115. struct job* jobs = malloc_orDie(sizeof(struct job) * numFrames);
  116. size_t i;
  117. for(i = 0; i < numFrames; i++) {
  118. void* in = malloc_orDie(frameSize);
  119. void* out = malloc_orDie(dstSize);
  120. size_t inSize = fread_orDie(in, frameSize, fin);
  121. jobs[i].src = in;
  122. jobs[i].srcSize = inSize;
  123. jobs[i].dst = out;
  124. jobs[i].dstSize = dstSize;
  125. jobs[i].compressionLevel = cLevel;
  126. jobs[i].done = 0;
  127. POOL_add(pool, compressFrame, &jobs[i]);
  128. }
  129. ZSTD_frameLog* fl = ZSTD_seekable_createFrameLog(1);
  130. if (fl == NULL) { fprintf(stderr, "ZSTD_seekable_createFrameLog() failed \n"); exit(11); }
  131. for (i = 0; i < numFrames; i++) {
  132. while (!jobs[i].done) SLEEP(5); /* wake up every 5 milliseconds to check */
  133. fwrite_orDie(jobs[i].dst, jobs[i].dstSize, fout);
  134. free((void*)jobs[i].src);
  135. free(jobs[i].dst);
  136. size_t ret = ZSTD_seekable_logFrame(fl, jobs[i].dstSize, jobs[i].srcSize, jobs[i].checksum);
  137. if (ZSTD_isError(ret)) { fprintf(stderr, "ZSTD_seekable_logFrame() error : %s \n", ZSTD_getErrorName(ret)); }
  138. }
  139. { unsigned char seekTableBuff[1024];
  140. ZSTD_outBuffer out = {seekTableBuff, 1024, 0};
  141. while (ZSTD_seekable_writeSeekTable(fl, &out) != 0) {
  142. fwrite_orDie(seekTableBuff, out.pos, fout);
  143. out.pos = 0;
  144. }
  145. fwrite_orDie(seekTableBuff, out.pos, fout);
  146. }
  147. ZSTD_seekable_freeFrameLog(fl);
  148. free(jobs);
  149. fclose_orDie(fout);
  150. fclose_orDie(fin);
  151. }
  152. static const char* createOutFilename_orDie(const char* filename)
  153. {
  154. size_t const inL = strlen(filename);
  155. size_t const outL = inL + 5;
  156. void* outSpace = malloc_orDie(outL);
  157. memset(outSpace, 0, outL);
  158. strcat(outSpace, filename);
  159. strcat(outSpace, ".zst");
  160. return (const char*)outSpace;
  161. }
  162. int main(int argc, const char** argv) {
  163. const char* const exeName = argv[0];
  164. if (argc!=4) {
  165. printf("wrong arguments\n");
  166. printf("usage:\n");
  167. printf("%s FILE FRAME_SIZE NB_THREADS\n", exeName);
  168. return 1;
  169. }
  170. { const char* const inFileName = argv[1];
  171. unsigned const frameSize = (unsigned)atoi(argv[2]);
  172. int const nbThreads = atoi(argv[3]);
  173. const char* const outFileName = createOutFilename_orDie(inFileName);
  174. compressFile_orDie(inFileName, outFileName, 5, frameSize, nbThreads);
  175. }
  176. return 0;
  177. }