command_queue_mt.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /**************************************************************************/
  2. /* command_queue_mt.h */
  3. /**************************************************************************/
  4. /* This file is part of: */
  5. /* GODOT ENGINE */
  6. /* https://godotengine.org */
  7. /**************************************************************************/
  8. /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
  9. /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
  10. /* */
  11. /* Permission is hereby granted, free of charge, to any person obtaining */
  12. /* a copy of this software and associated documentation files (the */
  13. /* "Software"), to deal in the Software without restriction, including */
  14. /* without limitation the rights to use, copy, modify, merge, publish, */
  15. /* distribute, sublicense, and/or sell copies of the Software, and to */
  16. /* permit persons to whom the Software is furnished to do so, subject to */
  17. /* the following conditions: */
  18. /* */
  19. /* The above copyright notice and this permission notice shall be */
  20. /* included in all copies or substantial portions of the Software. */
  21. /* */
  22. /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
  23. /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
  24. /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
  25. /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
  26. /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
  27. /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
  28. /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
  29. /**************************************************************************/
  30. #pragma once
  31. #include "core/object/worker_thread_pool.h"
  32. #include "core/os/condition_variable.h"
  33. #include "core/os/mutex.h"
  34. #include "core/templates/local_vector.h"
  35. #include "core/templates/simple_type.h"
  36. #include "core/templates/tuple.h"
  37. #include "core/typedefs.h"
  38. class CommandQueueMT {
  39. static const size_t MAX_COMMAND_SIZE = 1024;
  40. struct CommandBase {
  41. bool sync = false;
  42. virtual void call() = 0;
  43. virtual ~CommandBase() = default;
  44. CommandBase(bool p_sync) :
  45. sync(p_sync) {}
  46. };
  47. template <typename T, typename M, bool NeedsSync, typename... Args>
  48. struct Command : public CommandBase {
  49. T *instance;
  50. M method;
  51. Tuple<GetSimpleTypeT<Args>...> args;
  52. template <typename... FwdArgs>
  53. _FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
  54. CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
  55. void call() override {
  56. call_impl(BuildIndexSequence<sizeof...(Args)>{});
  57. }
  58. private:
  59. template <size_t... I>
  60. _FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
  61. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  62. (instance->*method)(std::move(get<I>())...);
  63. }
  64. // This method exists so we can call it in the parameter pack expansion in call_impl.
  65. template <size_t I>
  66. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  67. };
  68. // Separate class from Command so we can save the space of the ret pointer for commands that don't return.
  69. template <typename T, typename M, typename R, typename... Args>
  70. struct CommandRet : public CommandBase {
  71. T *instance;
  72. M method;
  73. R *ret;
  74. Tuple<GetSimpleTypeT<Args>...> args;
  75. _FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
  76. CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
  77. void call() override {
  78. *ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
  79. }
  80. private:
  81. template <size_t... I>
  82. _FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
  83. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  84. return (instance->*method)(std::move(get<I>())...);
  85. }
  86. // This method exists so we can call it in the parameter pack expansion in call_impl.
  87. template <size_t I>
  88. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  89. };
  90. /***** BASE *******/
  91. static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
  92. inline static thread_local bool flushing = false;
  93. BinaryMutex mutex;
  94. LocalVector<uint8_t> command_mem;
  95. ConditionVariable sync_cond_var;
  96. uint32_t sync_head = 0;
  97. uint32_t sync_tail = 0;
  98. uint32_t sync_awaiters = 0;
  99. WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
  100. uint64_t flush_read_ptr = 0;
  101. std::atomic<bool> pending{ false };
  102. template <typename T, typename... Args>
  103. _FORCE_INLINE_ void create_command(Args &&...p_args) {
  104. // alloc size is size+T+safeguard
  105. constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
  106. static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
  107. uint64_t size = command_mem.size();
  108. command_mem.resize(size + alloc_size + sizeof(uint64_t));
  109. *(uint64_t *)&command_mem[size] = alloc_size;
  110. void *cmd = &command_mem[size + sizeof(uint64_t)];
  111. memnew_placement(cmd, T(std::forward<Args>(p_args)...));
  112. pending.store(true);
  113. }
  114. template <typename T, bool NeedsSync, typename... Args>
  115. _FORCE_INLINE_ void _push_internal(Args &&...args) {
  116. MutexLock mlock(mutex);
  117. create_command<T>(std::forward<Args>(args)...);
  118. if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
  119. WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
  120. }
  121. if constexpr (NeedsSync) {
  122. sync_tail++;
  123. _wait_for_sync(mlock);
  124. }
  125. }
  126. _FORCE_INLINE_ void _prevent_sync_wraparound() {
  127. bool safe_to_reset = !sync_awaiters;
  128. bool already_sync_to_latest = sync_head == sync_tail;
  129. if (safe_to_reset && already_sync_to_latest) {
  130. sync_head = 0;
  131. sync_tail = 0;
  132. }
  133. }
  134. void _flush() {
  135. // Safeguard against trying to re-lock the binary mutex.
  136. if (flushing) {
  137. return;
  138. }
  139. flushing = true;
  140. MutexLock lock(mutex);
  141. if (unlikely(flush_read_ptr)) {
  142. // Another thread is flushing.
  143. lock.temp_unlock(); // Not really temp.
  144. sync();
  145. flushing = false;
  146. return;
  147. }
  148. alignas(uint64_t) char cmd_local_mem[MAX_COMMAND_SIZE];
  149. while (flush_read_ptr < command_mem.size()) {
  150. uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
  151. flush_read_ptr += sizeof(uint64_t);
  152. // Protect against race condition between this thread
  153. // during the call to the command and other threads potentially
  154. // invalidating the pointer due to reallocs by relocating the object.
  155. CommandBase *cmd_original = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  156. CommandBase *cmd_local = reinterpret_cast<CommandBase *>(cmd_local_mem);
  157. memcpy(cmd_local_mem, (char *)cmd_original, size);
  158. lock.temp_unlock();
  159. cmd_local->call();
  160. lock.temp_relock();
  161. if (unlikely(cmd_local->sync)) {
  162. sync_head++;
  163. lock.temp_unlock(); // Give an opportunity to awaiters right away.
  164. sync_cond_var.notify_all();
  165. lock.temp_relock();
  166. }
  167. cmd_local->~CommandBase();
  168. flush_read_ptr += size;
  169. }
  170. command_mem.clear();
  171. pending.store(false);
  172. flush_read_ptr = 0;
  173. _prevent_sync_wraparound();
  174. flushing = false;
  175. }
  176. _FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
  177. sync_awaiters++;
  178. uint32_t sync_head_goal = sync_tail;
  179. do {
  180. sync_cond_var.wait(p_lock);
  181. } while (sync_head < sync_head_goal);
  182. sync_awaiters--;
  183. _prevent_sync_wraparound();
  184. }
  185. void _no_op() {}
  186. public:
  187. template <typename T, typename M, typename... Args>
  188. void push(T *p_instance, M p_method, Args &&...p_args) {
  189. // Standard command, no sync.
  190. using CommandType = Command<T, M, false, Args...>;
  191. static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
  192. _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
  193. }
  194. template <typename T, typename M, typename... Args>
  195. void push_and_sync(T *p_instance, M p_method, Args... p_args) {
  196. // Standard command, sync.
  197. using CommandType = Command<T, M, true, Args...>;
  198. static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
  199. _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
  200. }
  201. template <typename T, typename M, typename R, typename... Args>
  202. void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
  203. // Command with return value, sync.
  204. using CommandType = CommandRet<T, M, R, Args...>;
  205. static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
  206. _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
  207. }
  208. _FORCE_INLINE_ void flush_if_pending() {
  209. if (unlikely(pending.load())) {
  210. _flush();
  211. }
  212. }
  213. void flush_all() {
  214. _flush();
  215. }
  216. void sync() {
  217. push_and_sync(this, &CommandQueueMT::_no_op);
  218. }
  219. void wait_and_flush() {
  220. ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
  221. WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
  222. _flush();
  223. }
  224. void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
  225. MutexLock lock(mutex);
  226. pump_task_id = p_task_id;
  227. }
  228. CommandQueueMT() {
  229. command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024);
  230. }
  231. };