command_queue_mt.h 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /**************************************************************************/
  2. /* command_queue_mt.h */
  3. /**************************************************************************/
  4. /* This file is part of: */
  5. /* GODOT ENGINE */
  6. /* https://godotengine.org */
  7. /**************************************************************************/
  8. /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
  9. /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
  10. /* */
  11. /* Permission is hereby granted, free of charge, to any person obtaining */
  12. /* a copy of this software and associated documentation files (the */
  13. /* "Software"), to deal in the Software without restriction, including */
  14. /* without limitation the rights to use, copy, modify, merge, publish, */
  15. /* distribute, sublicense, and/or sell copies of the Software, and to */
  16. /* permit persons to whom the Software is furnished to do so, subject to */
  17. /* the following conditions: */
  18. /* */
  19. /* The above copyright notice and this permission notice shall be */
  20. /* included in all copies or substantial portions of the Software. */
  21. /* */
  22. /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
  23. /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
  24. /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
  25. /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
  26. /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
  27. /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
  28. /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
  29. /**************************************************************************/
  30. #pragma once
  31. #include "core/object/worker_thread_pool.h"
  32. #include "core/os/condition_variable.h"
  33. #include "core/os/mutex.h"
  34. #include "core/templates/local_vector.h"
  35. #include "core/templates/simple_type.h"
  36. #include "core/templates/tuple.h"
  37. #include "core/typedefs.h"
  38. class CommandQueueMT {
  39. struct CommandBase {
  40. bool sync = false;
  41. virtual void call() = 0;
  42. virtual ~CommandBase() = default;
  43. CommandBase(bool p_sync) :
  44. sync(p_sync) {}
  45. };
  46. template <typename T, typename M, bool NeedsSync, typename... Args>
  47. struct Command : public CommandBase {
  48. T *instance;
  49. M method;
  50. Tuple<GetSimpleTypeT<Args>...> args;
  51. template <typename... FwdArgs>
  52. _FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
  53. CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
  54. void call() {
  55. call_impl(BuildIndexSequence<sizeof...(Args)>{});
  56. }
  57. private:
  58. template <size_t... I>
  59. _FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
  60. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  61. (instance->*method)(std::move(get<I>())...);
  62. }
  63. // This method exists so we can call it in the parameter pack expansion in call_impl.
  64. template <size_t I>
  65. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  66. };
  67. // Separate class from Command so we can save the space of the ret pointer for commands that don't return.
  68. template <typename T, typename M, typename R, typename... Args>
  69. struct CommandRet : public CommandBase {
  70. T *instance;
  71. M method;
  72. R *ret;
  73. Tuple<GetSimpleTypeT<Args>...> args;
  74. _FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
  75. CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
  76. void call() override {
  77. *ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
  78. }
  79. private:
  80. template <size_t... I>
  81. _FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
  82. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  83. return (instance->*method)(std::move(get<I>())...);
  84. }
  85. // This method exists so we can call it in the parameter pack expansion in call_impl.
  86. template <size_t I>
  87. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  88. };
  89. /***** BASE *******/
  90. static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
  91. BinaryMutex mutex;
  92. LocalVector<uint8_t> command_mem;
  93. ConditionVariable sync_cond_var;
  94. uint32_t sync_head = 0;
  95. uint32_t sync_tail = 0;
  96. uint32_t sync_awaiters = 0;
  97. WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
  98. uint64_t flush_read_ptr = 0;
  99. std::atomic<bool> pending{ false };
  100. template <typename T, typename... Args>
  101. _FORCE_INLINE_ void create_command(Args &&...p_args) {
  102. // alloc size is size+T+safeguard
  103. constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
  104. static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
  105. uint64_t size = command_mem.size();
  106. command_mem.resize(size + alloc_size + sizeof(uint64_t));
  107. *(uint64_t *)&command_mem[size] = alloc_size;
  108. void *cmd = &command_mem[size + sizeof(uint64_t)];
  109. new (cmd) T(std::forward<Args>(p_args)...);
  110. pending.store(true);
  111. }
  112. template <typename T, bool NeedsSync, typename... Args>
  113. _FORCE_INLINE_ void _push_internal(Args &&...args) {
  114. MutexLock mlock(mutex);
  115. create_command<T>(std::forward<Args>(args)...);
  116. if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
  117. WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
  118. }
  119. if constexpr (NeedsSync) {
  120. sync_tail++;
  121. _wait_for_sync(mlock);
  122. }
  123. }
  124. _FORCE_INLINE_ void _prevent_sync_wraparound() {
  125. bool safe_to_reset = !sync_awaiters;
  126. bool already_sync_to_latest = sync_head == sync_tail;
  127. if (safe_to_reset && already_sync_to_latest) {
  128. sync_head = 0;
  129. sync_tail = 0;
  130. }
  131. }
  132. void _flush() {
  133. if (unlikely(flush_read_ptr)) {
  134. // Re-entrant call.
  135. return;
  136. }
  137. MutexLock lock(mutex);
  138. while (flush_read_ptr < command_mem.size()) {
  139. uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
  140. flush_read_ptr += 8;
  141. CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  142. uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
  143. cmd->call();
  144. WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
  145. // Handle potential realloc due to the command and unlock allowance.
  146. cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  147. if (unlikely(cmd->sync)) {
  148. sync_head++;
  149. lock.~MutexLock(); // Give an opportunity to awaiters right away.
  150. sync_cond_var.notify_all();
  151. new (&lock) MutexLock(mutex);
  152. // Handle potential realloc happened during unlock.
  153. cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  154. }
  155. cmd->~CommandBase();
  156. flush_read_ptr += size;
  157. }
  158. command_mem.clear();
  159. pending.store(false);
  160. flush_read_ptr = 0;
  161. _prevent_sync_wraparound();
  162. }
  163. _FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
  164. sync_awaiters++;
  165. uint32_t sync_head_goal = sync_tail;
  166. do {
  167. sync_cond_var.wait(p_lock);
  168. } while (sync_head < sync_head_goal);
  169. sync_awaiters--;
  170. _prevent_sync_wraparound();
  171. }
  172. void _no_op() {}
  173. public:
  174. template <typename T, typename M, typename... Args>
  175. void push(T *p_instance, M p_method, Args &&...p_args) {
  176. // Standard command, no sync.
  177. using CommandType = Command<T, M, false, Args...>;
  178. _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
  179. }
  180. template <typename T, typename M, typename... Args>
  181. void push_and_sync(T *p_instance, M p_method, Args... p_args) {
  182. // Standard command, sync.
  183. using CommandType = Command<T, M, true, Args...>;
  184. _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
  185. }
  186. template <typename T, typename M, typename R, typename... Args>
  187. void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
  188. // Command with return value, sync.
  189. using CommandType = CommandRet<T, M, R, Args...>;
  190. _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
  191. }
  192. _FORCE_INLINE_ void flush_if_pending() {
  193. if (unlikely(pending.load())) {
  194. _flush();
  195. }
  196. }
  197. void flush_all() {
  198. _flush();
  199. }
  200. void sync() {
  201. push_and_sync(this, &CommandQueueMT::_no_op);
  202. }
  203. void wait_and_flush() {
  204. ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
  205. WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
  206. _flush();
  207. }
  208. void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
  209. MutexLock lock(mutex);
  210. pump_task_id = p_task_id;
  211. }
  212. CommandQueueMT();
  213. ~CommandQueueMT();
  214. };