vp9_multi_thread.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /*
  2. * Copyright (c) 2017 The WebM project authors. All Rights Reserved.
  3. *
  4. * Use of this source code is governed by a BSD-style license
  5. * that can be found in the LICENSE file in the root of the source
  6. * tree. An additional intellectual property rights grant can be found
  7. * in the file PATENTS. All contributing project authors may
  8. * be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #include <assert.h>
  11. #include "vp9/encoder/vp9_encoder.h"
  12. #include "vp9/encoder/vp9_ethread.h"
  13. #include "vp9/encoder/vp9_multi_thread.h"
  14. void *vp9_enc_grp_get_next_job(MultiThreadHandle *multi_thread_ctxt,
  15. int tile_id) {
  16. RowMTInfo *row_mt_info;
  17. JobQueueHandle *job_queue_hdl = NULL;
  18. void *next = NULL;
  19. JobNode *job_info = NULL;
  20. #if CONFIG_MULTITHREAD
  21. pthread_mutex_t *mutex_handle = NULL;
  22. #endif
  23. row_mt_info = (RowMTInfo *)(&multi_thread_ctxt->row_mt_info[tile_id]);
  24. job_queue_hdl = (JobQueueHandle *)&row_mt_info->job_queue_hdl;
  25. #if CONFIG_MULTITHREAD
  26. mutex_handle = &row_mt_info->job_mutex;
  27. #endif
  28. // lock the mutex for queue access
  29. #if CONFIG_MULTITHREAD
  30. pthread_mutex_lock(mutex_handle);
  31. #endif
  32. next = job_queue_hdl->next;
  33. if (NULL != next) {
  34. JobQueue *job_queue = (JobQueue *)next;
  35. job_info = &job_queue->job_info;
  36. // Update the next job in the queue
  37. job_queue_hdl->next = job_queue->next;
  38. job_queue_hdl->num_jobs_acquired++;
  39. }
  40. #if CONFIG_MULTITHREAD
  41. pthread_mutex_unlock(mutex_handle);
  42. #endif
  43. return job_info;
  44. }
  45. void vp9_row_mt_mem_alloc(VP9_COMP *cpi) {
  46. struct VP9Common *cm = &cpi->common;
  47. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  48. int tile_row, tile_col;
  49. const int tile_cols = 1 << cm->log2_tile_cols;
  50. const int tile_rows = 1 << cm->log2_tile_rows;
  51. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  52. int jobs_per_tile_col, total_jobs;
  53. jobs_per_tile_col = VPXMAX(cm->mb_rows, sb_rows);
  54. // Calculate the total number of jobs
  55. total_jobs = jobs_per_tile_col * tile_cols;
  56. multi_thread_ctxt->allocated_tile_cols = tile_cols;
  57. multi_thread_ctxt->allocated_tile_rows = tile_rows;
  58. multi_thread_ctxt->allocated_vert_unit_rows = jobs_per_tile_col;
  59. multi_thread_ctxt->job_queue =
  60. (JobQueue *)vpx_memalign(32, total_jobs * sizeof(JobQueue));
  61. #if CONFIG_MULTITHREAD
  62. // Create mutex for each tile
  63. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  64. RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
  65. pthread_mutex_init(&row_mt_info->job_mutex, NULL);
  66. }
  67. #endif
  68. // Allocate memory for row based multi-threading
  69. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  70. TileDataEnc *this_tile = &cpi->tile_data[tile_col];
  71. vp9_row_mt_sync_mem_alloc(&this_tile->row_mt_sync, cm, jobs_per_tile_col);
  72. if (cpi->sf.adaptive_rd_thresh_row_mt) {
  73. const int sb_rows =
  74. (mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2) + 1;
  75. int i;
  76. this_tile->row_base_thresh_freq_fact =
  77. (int *)vpx_calloc(sb_rows * BLOCK_SIZES * MAX_MODES,
  78. sizeof(*(this_tile->row_base_thresh_freq_fact)));
  79. for (i = 0; i < sb_rows * BLOCK_SIZES * MAX_MODES; i++)
  80. this_tile->row_base_thresh_freq_fact[i] = RD_THRESH_INIT_FACT;
  81. }
  82. }
  83. // Assign the sync pointer of tile row zero for every tile row > 0
  84. for (tile_row = 1; tile_row < tile_rows; tile_row++) {
  85. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  86. TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
  87. TileDataEnc *this_col_tile = &cpi->tile_data[tile_col];
  88. this_tile->row_mt_sync = this_col_tile->row_mt_sync;
  89. }
  90. }
  91. // Calculate the number of vertical units in the given tile row
  92. for (tile_row = 0; tile_row < tile_rows; tile_row++) {
  93. TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols];
  94. TileInfo *tile_info = &this_tile->tile_info;
  95. multi_thread_ctxt->num_tile_vert_sbs[tile_row] =
  96. get_num_vert_units(*tile_info, MI_BLOCK_SIZE_LOG2);
  97. }
  98. }
  99. void vp9_row_mt_mem_dealloc(VP9_COMP *cpi) {
  100. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  101. int tile_col;
  102. #if CONFIG_MULTITHREAD
  103. int tile_row;
  104. #endif
  105. // Deallocate memory for job queue
  106. if (multi_thread_ctxt->job_queue) vpx_free(multi_thread_ctxt->job_queue);
  107. #if CONFIG_MULTITHREAD
  108. // Destroy mutex for each tile
  109. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  110. tile_col++) {
  111. RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
  112. if (row_mt_info) pthread_mutex_destroy(&row_mt_info->job_mutex);
  113. }
  114. #endif
  115. // Free row based multi-threading sync memory
  116. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  117. tile_col++) {
  118. TileDataEnc *this_tile = &cpi->tile_data[tile_col];
  119. vp9_row_mt_sync_mem_dealloc(&this_tile->row_mt_sync);
  120. }
  121. #if CONFIG_MULTITHREAD
  122. for (tile_row = 0; tile_row < multi_thread_ctxt->allocated_tile_rows;
  123. tile_row++) {
  124. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  125. tile_col++) {
  126. TileDataEnc *this_tile =
  127. &cpi->tile_data[tile_row * multi_thread_ctxt->allocated_tile_cols +
  128. tile_col];
  129. if (cpi->sf.adaptive_rd_thresh_row_mt) {
  130. if (this_tile->row_base_thresh_freq_fact != NULL) {
  131. vpx_free(this_tile->row_base_thresh_freq_fact);
  132. this_tile->row_base_thresh_freq_fact = NULL;
  133. }
  134. }
  135. }
  136. }
  137. #endif
  138. }
  139. void vp9_multi_thread_tile_init(VP9_COMP *cpi) {
  140. VP9_COMMON *const cm = &cpi->common;
  141. const int tile_cols = 1 << cm->log2_tile_cols;
  142. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  143. int i;
  144. for (i = 0; i < tile_cols; i++) {
  145. TileDataEnc *this_tile = &cpi->tile_data[i];
  146. int jobs_per_tile_col = cpi->oxcf.pass == 1 ? cm->mb_rows : sb_rows;
  147. // Initialize cur_col to -1 for all rows.
  148. memset(this_tile->row_mt_sync.cur_col, -1,
  149. sizeof(*this_tile->row_mt_sync.cur_col) * jobs_per_tile_col);
  150. vp9_zero(this_tile->fp_data);
  151. this_tile->fp_data.image_data_start_row = INVALID_ROW;
  152. }
  153. }
  154. void vp9_assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt,
  155. int tile_cols, int num_workers) {
  156. int tile_id = 0;
  157. int i;
  158. // Allocating the threads for the tiles
  159. for (i = 0; i < num_workers; i++) {
  160. multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
  161. if (tile_id == tile_cols) tile_id = 0;
  162. }
  163. }
  164. int vp9_get_job_queue_status(MultiThreadHandle *multi_thread_ctxt,
  165. int cur_tile_id) {
  166. RowMTInfo *row_mt_info;
  167. JobQueueHandle *job_queue_hndl;
  168. #if CONFIG_MULTITHREAD
  169. pthread_mutex_t *mutex;
  170. #endif
  171. int num_jobs_remaining;
  172. row_mt_info = &multi_thread_ctxt->row_mt_info[cur_tile_id];
  173. job_queue_hndl = &row_mt_info->job_queue_hdl;
  174. #if CONFIG_MULTITHREAD
  175. mutex = &row_mt_info->job_mutex;
  176. #endif
  177. #if CONFIG_MULTITHREAD
  178. pthread_mutex_lock(mutex);
  179. #endif
  180. num_jobs_remaining =
  181. multi_thread_ctxt->jobs_per_tile_col - job_queue_hndl->num_jobs_acquired;
  182. #if CONFIG_MULTITHREAD
  183. pthread_mutex_unlock(mutex);
  184. #endif
  185. return (num_jobs_remaining);
  186. }
  187. void vp9_prepare_job_queue(VP9_COMP *cpi, JOB_TYPE job_type) {
  188. VP9_COMMON *const cm = &cpi->common;
  189. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  190. JobQueue *job_queue = multi_thread_ctxt->job_queue;
  191. const int tile_cols = 1 << cm->log2_tile_cols;
  192. int job_row_num, jobs_per_tile, jobs_per_tile_col, total_jobs;
  193. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  194. int tile_col, i;
  195. jobs_per_tile_col = (job_type != ENCODE_JOB) ? cm->mb_rows : sb_rows;
  196. total_jobs = jobs_per_tile_col * tile_cols;
  197. multi_thread_ctxt->jobs_per_tile_col = jobs_per_tile_col;
  198. // memset the entire job queue buffer to zero
  199. memset(job_queue, 0, total_jobs * sizeof(JobQueue));
  200. // Job queue preparation
  201. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  202. RowMTInfo *tile_ctxt = &multi_thread_ctxt->row_mt_info[tile_col];
  203. JobQueue *job_queue_curr, *job_queue_temp;
  204. int tile_row = 0;
  205. tile_ctxt->job_queue_hdl.next = (void *)job_queue;
  206. tile_ctxt->job_queue_hdl.num_jobs_acquired = 0;
  207. job_queue_curr = job_queue;
  208. job_queue_temp = job_queue;
  209. // loop over all the vertical rows
  210. for (job_row_num = 0, jobs_per_tile = 0; job_row_num < jobs_per_tile_col;
  211. job_row_num++, jobs_per_tile++) {
  212. job_queue_curr->job_info.vert_unit_row_num = job_row_num;
  213. job_queue_curr->job_info.tile_col_id = tile_col;
  214. job_queue_curr->job_info.tile_row_id = tile_row;
  215. job_queue_curr->next = (void *)(job_queue_temp + 1);
  216. job_queue_curr = ++job_queue_temp;
  217. if (ENCODE_JOB == job_type) {
  218. if (jobs_per_tile >=
  219. multi_thread_ctxt->num_tile_vert_sbs[tile_row] - 1) {
  220. tile_row++;
  221. jobs_per_tile = -1;
  222. }
  223. }
  224. }
  225. // Set the last pointer to NULL
  226. job_queue_curr += -1;
  227. job_queue_curr->next = (void *)NULL;
  228. // Move to the next tile
  229. job_queue += jobs_per_tile_col;
  230. }
  231. for (i = 0; i < cpi->num_workers; i++) {
  232. EncWorkerData *thread_data;
  233. thread_data = &cpi->tile_thr_data[i];
  234. thread_data->thread_id = i;
  235. for (tile_col = 0; tile_col < tile_cols; tile_col++)
  236. thread_data->tile_completion_status[tile_col] = 0;
  237. }
  238. }
  239. int vp9_get_tiles_proc_status(MultiThreadHandle *multi_thread_ctxt,
  240. int *tile_completion_status, int *cur_tile_id,
  241. int tile_cols) {
  242. int tile_col;
  243. int tile_id = -1; // Stores the tile ID with minimum proc done
  244. int max_num_jobs_remaining = 0;
  245. int num_jobs_remaining;
  246. // Mark the completion to avoid check in the loop
  247. tile_completion_status[*cur_tile_id] = 1;
  248. // Check for the status of all the tiles
  249. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  250. if (tile_completion_status[tile_col] == 0) {
  251. num_jobs_remaining =
  252. vp9_get_job_queue_status(multi_thread_ctxt, tile_col);
  253. // Mark the completion to avoid checks during future switches across tiles
  254. if (num_jobs_remaining == 0) tile_completion_status[tile_col] = 1;
  255. if (num_jobs_remaining > max_num_jobs_remaining) {
  256. max_num_jobs_remaining = num_jobs_remaining;
  257. tile_id = tile_col;
  258. }
  259. }
  260. }
  261. if (-1 == tile_id) {
  262. return 1;
  263. } else {
  264. // Update the cur ID to the next tile ID that will be processed,
  265. // which will be the least processed tile
  266. *cur_tile_id = tile_id;
  267. return 0;
  268. }
  269. }