test_command_queue.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*************************************************************************/
  2. /* test_command_queue.h */
  3. /*************************************************************************/
  4. /* This file is part of: */
  5. /* GODOT ENGINE */
  6. /* https://godotengine.org */
  7. /*************************************************************************/
  8. /* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur. */
  9. /* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md). */
  10. /* */
  11. /* Permission is hereby granted, free of charge, to any person obtaining */
  12. /* a copy of this software and associated documentation files (the */
  13. /* "Software"), to deal in the Software without restriction, including */
  14. /* without limitation the rights to use, copy, modify, merge, publish, */
  15. /* distribute, sublicense, and/or sell copies of the Software, and to */
  16. /* permit persons to whom the Software is furnished to do so, subject to */
  17. /* the following conditions: */
  18. /* */
  19. /* The above copyright notice and this permission notice shall be */
  20. /* included in all copies or substantial portions of the Software. */
  21. /* */
  22. /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
  23. /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
  24. /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
  25. /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
  26. /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
  27. /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
  28. /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
  29. /*************************************************************************/
  30. #ifndef TEST_COMMAND_QUEUE_H
  31. #define TEST_COMMAND_QUEUE_H
  32. #include "core/config/project_settings.h"
  33. #include "core/math/random_number_generator.h"
  34. #include "core/os/os.h"
  35. #include "core/os/thread.h"
  36. #include "core/templates/command_queue_mt.h"
  37. #include "tests/test_macros.h"
  38. #if !defined(NO_THREADS)
  39. namespace TestCommandQueue {
  40. class ThreadWork {
  41. Semaphore thread_sem;
  42. Semaphore main_sem;
  43. Mutex mut;
  44. int threading_errors = 0;
  45. enum State {
  46. MAIN_START,
  47. MAIN_DONE,
  48. THREAD_START,
  49. THREAD_DONE,
  50. } state;
  51. public:
  52. ThreadWork() {
  53. mut.lock();
  54. state = MAIN_START;
  55. }
  56. ~ThreadWork() {
  57. CHECK_MESSAGE(threading_errors == 0, "threads did not lock/unlock correctly");
  58. }
  59. void thread_wait_for_work() {
  60. thread_sem.wait();
  61. mut.lock();
  62. if (state != MAIN_DONE) {
  63. threading_errors++;
  64. }
  65. state = THREAD_START;
  66. }
  67. void thread_done_work() {
  68. if (state != THREAD_START) {
  69. threading_errors++;
  70. }
  71. state = THREAD_DONE;
  72. mut.unlock();
  73. main_sem.post();
  74. }
  75. void main_wait_for_done() {
  76. main_sem.wait();
  77. mut.lock();
  78. if (state != THREAD_DONE) {
  79. threading_errors++;
  80. }
  81. state = MAIN_START;
  82. }
  83. void main_start_work() {
  84. if (state != MAIN_START) {
  85. threading_errors++;
  86. }
  87. state = MAIN_DONE;
  88. mut.unlock();
  89. thread_sem.post();
  90. }
  91. };
  92. class SharedThreadState {
  93. public:
  94. ThreadWork reader_threadwork;
  95. ThreadWork writer_threadwork;
  96. CommandQueueMT command_queue = CommandQueueMT(true);
  97. enum TestMsgType {
  98. TEST_MSG_FUNC1_TRANSFORM,
  99. TEST_MSG_FUNC2_TRANSFORM_FLOAT,
  100. TEST_MSG_FUNC3_TRANSFORMx6,
  101. TEST_MSGSYNC_FUNC1_TRANSFORM,
  102. TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT,
  103. TEST_MSGRET_FUNC1_TRANSFORM,
  104. TEST_MSGRET_FUNC2_TRANSFORM_FLOAT,
  105. TEST_MSG_MAX
  106. };
  107. Vector<TestMsgType> message_types_to_write;
  108. bool during_writing = false;
  109. int message_count_to_read = 0;
  110. bool exit_threads = false;
  111. Thread reader_thread;
  112. Thread writer_thread;
  113. int func1_count = 0;
  114. void func1(Transform3D t) {
  115. func1_count++;
  116. }
  117. void func2(Transform3D t, float f) {
  118. func1_count++;
  119. }
  120. void func3(Transform3D t1, Transform3D t2, Transform3D t3, Transform3D t4, Transform3D t5, Transform3D t6) {
  121. func1_count++;
  122. }
  123. Transform3D func1r(Transform3D t) {
  124. func1_count++;
  125. return t;
  126. }
  127. Transform3D func2r(Transform3D t, float f) {
  128. func1_count++;
  129. return t;
  130. }
  131. void add_msg_to_write(TestMsgType type) {
  132. message_types_to_write.push_back(type);
  133. }
  134. void reader_thread_loop() {
  135. reader_threadwork.thread_wait_for_work();
  136. while (!exit_threads) {
  137. if (message_count_to_read < 0) {
  138. command_queue.flush_all();
  139. }
  140. for (int i = 0; i < message_count_to_read; i++) {
  141. command_queue.wait_and_flush();
  142. }
  143. message_count_to_read = 0;
  144. reader_threadwork.thread_done_work();
  145. reader_threadwork.thread_wait_for_work();
  146. }
  147. command_queue.flush_all();
  148. reader_threadwork.thread_done_work();
  149. }
  150. static void static_reader_thread_loop(void *stsvoid) {
  151. SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);
  152. sts->reader_thread_loop();
  153. }
  154. void writer_thread_loop() {
  155. during_writing = false;
  156. writer_threadwork.thread_wait_for_work();
  157. while (!exit_threads) {
  158. Transform3D tr;
  159. Transform3D otr;
  160. float f = 1;
  161. during_writing = true;
  162. for (int i = 0; i < message_types_to_write.size(); i++) {
  163. TestMsgType msg_type = message_types_to_write[i];
  164. switch (msg_type) {
  165. case TEST_MSG_FUNC1_TRANSFORM:
  166. command_queue.push(this, &SharedThreadState::func1, tr);
  167. break;
  168. case TEST_MSG_FUNC2_TRANSFORM_FLOAT:
  169. command_queue.push(this, &SharedThreadState::func2, tr, f);
  170. break;
  171. case TEST_MSG_FUNC3_TRANSFORMx6:
  172. command_queue.push(this, &SharedThreadState::func3, tr, tr, tr, tr, tr, tr);
  173. break;
  174. case TEST_MSGSYNC_FUNC1_TRANSFORM:
  175. command_queue.push_and_sync(this, &SharedThreadState::func1, tr);
  176. break;
  177. case TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT:
  178. command_queue.push_and_sync(this, &SharedThreadState::func2, tr, f);
  179. break;
  180. case TEST_MSGRET_FUNC1_TRANSFORM:
  181. command_queue.push_and_ret(this, &SharedThreadState::func1r, tr, &otr);
  182. break;
  183. case TEST_MSGRET_FUNC2_TRANSFORM_FLOAT:
  184. command_queue.push_and_ret(this, &SharedThreadState::func2r, tr, f, &otr);
  185. break;
  186. default:
  187. break;
  188. }
  189. }
  190. message_types_to_write.clear();
  191. during_writing = false;
  192. writer_threadwork.thread_done_work();
  193. writer_threadwork.thread_wait_for_work();
  194. }
  195. writer_threadwork.thread_done_work();
  196. }
  197. static void static_writer_thread_loop(void *stsvoid) {
  198. SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);
  199. sts->writer_thread_loop();
  200. }
  201. void init_threads() {
  202. reader_thread.start(&SharedThreadState::static_reader_thread_loop, this);
  203. writer_thread.start(&SharedThreadState::static_writer_thread_loop, this);
  204. }
  205. void destroy_threads() {
  206. exit_threads = true;
  207. reader_threadwork.main_start_work();
  208. writer_threadwork.main_start_work();
  209. reader_thread.wait_to_finish();
  210. writer_thread.wait_to_finish();
  211. }
  212. };
  213. TEST_CASE("[CommandQueue] Test Queue Basics") {
  214. const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
  215. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
  216. SharedThreadState sts;
  217. sts.init_threads();
  218. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
  219. sts.writer_threadwork.main_start_work();
  220. sts.writer_threadwork.main_wait_for_done();
  221. CHECK_MESSAGE(sts.func1_count == 0,
  222. "Control: no messages read before reader has run.");
  223. sts.message_count_to_read = 1;
  224. sts.reader_threadwork.main_start_work();
  225. sts.reader_threadwork.main_wait_for_done();
  226. CHECK_MESSAGE(sts.func1_count == 1,
  227. "Reader should have read one message");
  228. sts.message_count_to_read = -1;
  229. sts.reader_threadwork.main_start_work();
  230. sts.reader_threadwork.main_wait_for_done();
  231. CHECK_MESSAGE(sts.func1_count == 1,
  232. "Reader should have read no additional messages from flush_all");
  233. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
  234. sts.writer_threadwork.main_start_work();
  235. sts.writer_threadwork.main_wait_for_done();
  236. sts.message_count_to_read = -1;
  237. sts.reader_threadwork.main_start_work();
  238. sts.reader_threadwork.main_wait_for_done();
  239. CHECK_MESSAGE(sts.func1_count == 2,
  240. "Reader should have read one additional message from flush_all");
  241. sts.destroy_threads();
  242. CHECK_MESSAGE(sts.func1_count == 2,
  243. "Reader should have read no additional messages after join");
  244. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
  245. ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
  246. }
  247. TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {
  248. const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
  249. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
  250. SharedThreadState sts;
  251. sts.init_threads();
  252. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  253. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  254. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
  255. sts.writer_threadwork.main_start_work();
  256. sts.writer_threadwork.main_wait_for_done();
  257. sts.message_count_to_read = -1;
  258. sts.reader_threadwork.main_start_work();
  259. sts.reader_threadwork.main_wait_for_done();
  260. CHECK_MESSAGE(sts.func1_count == 3,
  261. "Reader should have read at least three messages");
  262. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  263. sts.writer_threadwork.main_start_work();
  264. sts.writer_threadwork.main_wait_for_done();
  265. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
  266. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  267. sts.writer_threadwork.main_start_work();
  268. OS::get_singleton()->delay_usec(1000);
  269. sts.message_count_to_read = -1;
  270. sts.reader_threadwork.main_start_work();
  271. OS::get_singleton()->delay_usec(1000);
  272. sts.writer_threadwork.main_wait_for_done();
  273. sts.reader_threadwork.main_wait_for_done();
  274. CHECK_MESSAGE(sts.func1_count >= 3,
  275. "Reader should have read at least three messages");
  276. sts.message_count_to_read = 6 - sts.func1_count;
  277. sts.reader_threadwork.main_start_work();
  278. // The following will fail immediately.
  279. // The reason it hangs indefinitely in engine, is all subsequent calls to
  280. // CommandQueue.wait_and_flush_one will also fail.
  281. sts.reader_threadwork.main_wait_for_done();
  282. // Because looping around uses an extra message, easiest to consume all.
  283. sts.message_count_to_read = -1;
  284. sts.reader_threadwork.main_start_work();
  285. sts.reader_threadwork.main_wait_for_done();
  286. CHECK_MESSAGE(sts.func1_count == 6,
  287. "Reader should have read both message sets");
  288. sts.destroy_threads();
  289. CHECK_MESSAGE(sts.func1_count == 6,
  290. "Reader should have read no additional messages after join");
  291. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
  292. ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
  293. }
  294. TEST_CASE("[CommandQueue] Test Queue Lapping") {
  295. const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
  296. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
  297. SharedThreadState sts;
  298. sts.init_threads();
  299. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
  300. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  301. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  302. sts.writer_threadwork.main_start_work();
  303. sts.writer_threadwork.main_wait_for_done();
  304. // We need to read an extra message so that it triggers the dealloc logic once.
  305. // Otherwise, the queue will be considered full.
  306. sts.message_count_to_read = 3;
  307. sts.reader_threadwork.main_start_work();
  308. sts.reader_threadwork.main_wait_for_done();
  309. CHECK_MESSAGE(sts.func1_count == 3,
  310. "Reader should have read first set of messages");
  311. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  312. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
  313. sts.writer_threadwork.main_start_work();
  314. // Don't wait for these, because the queue isn't big enough.
  315. sts.writer_threadwork.main_wait_for_done();
  316. sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC2_TRANSFORM_FLOAT);
  317. sts.writer_threadwork.main_start_work();
  318. OS::get_singleton()->delay_usec(1000);
  319. sts.message_count_to_read = 3;
  320. sts.reader_threadwork.main_start_work();
  321. sts.reader_threadwork.main_wait_for_done();
  322. sts.writer_threadwork.main_wait_for_done();
  323. sts.message_count_to_read = -1;
  324. sts.reader_threadwork.main_start_work();
  325. sts.reader_threadwork.main_wait_for_done();
  326. CHECK_MESSAGE(sts.func1_count == 6,
  327. "Reader should have read rest of the messages after lapping writers.");
  328. sts.destroy_threads();
  329. CHECK_MESSAGE(sts.func1_count == 6,
  330. "Reader should have read no additional messages after join");
  331. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
  332. ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
  333. }
  334. TEST_CASE("[Stress][CommandQueue] Stress test command queue") {
  335. const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
  336. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
  337. SharedThreadState sts;
  338. sts.init_threads();
  339. RandomNumberGenerator rng;
  340. rng.set_seed(1837267);
  341. int msgs_to_add = 2048;
  342. for (int i = 0; i < msgs_to_add; i++) {
  343. // randi_range is inclusive, so allow any enum value except MAX.
  344. sts.add_msg_to_write((SharedThreadState::TestMsgType)rng.randi_range(0, SharedThreadState::TEST_MSG_MAX - 1));
  345. }
  346. sts.writer_threadwork.main_start_work();
  347. int max_loop_iters = msgs_to_add * 2;
  348. int loop_iters = 0;
  349. while (sts.func1_count < msgs_to_add && loop_iters < max_loop_iters) {
  350. int remaining = (msgs_to_add - sts.func1_count);
  351. sts.message_count_to_read = rng.randi_range(1, remaining < 128 ? remaining : 128);
  352. if (loop_iters % 3 == 0) {
  353. sts.message_count_to_read = -1;
  354. }
  355. sts.reader_threadwork.main_start_work();
  356. sts.reader_threadwork.main_wait_for_done();
  357. loop_iters++;
  358. }
  359. CHECK_MESSAGE(loop_iters < max_loop_iters,
  360. "Reader needed too many iterations to read messages!");
  361. sts.writer_threadwork.main_wait_for_done();
  362. sts.destroy_threads();
  363. CHECK_MESSAGE(sts.func1_count == msgs_to_add,
  364. "Reader should have read no additional messages after join");
  365. ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
  366. ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
  367. }
  368. } // namespace TestCommandQueue
  369. #endif // !defined(NO_THREADS)
  370. #endif // TEST_COMMAND_QUEUE_H