Browse Source

Merge pull request #42315 from lyuma/command_queue_fix

core/command_queue_mt.h: Fix crash/hang when buffer fills up
Rémi Verschelde 4 years ago
parent
commit
88a3db5bff
4 changed files with 514 additions and 12 deletions
  1. 6 1
      core/command_queue_mt.cpp
  2. 25 11
      core/command_queue_mt.h
  3. 482 0
      tests/test_command_queue.h
  4. 1 0
      tests/test_main.cpp

+ 6 - 1
core/command_queue_mt.cpp

@@ -31,6 +31,7 @@
 #include "command_queue_mt.h"
 
 #include "core/os/os.h"
+#include "core/project_settings.h"
 
 void CommandQueueMT::lock() {
 	mutex.lock();
@@ -71,7 +72,7 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
 
 bool CommandQueueMT::dealloc_one() {
 tryagain:
-	if (dealloc_ptr == write_ptr) {
+	if (dealloc_ptr == (write_ptr_and_epoch >> 1)) {
 		// The queue is empty
 		return false;
 	}
@@ -94,6 +95,10 @@ tryagain:
 }
 
 CommandQueueMT::CommandQueueMT(bool p_sync) {
+	command_mem_size = GLOBAL_DEF_RST("memory/limits/command_queue/multithreading_queue_size_kb", DEFAULT_COMMAND_MEM_SIZE_KB);
+	ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/command_queue/multithreading_queue_size_kb", PropertyInfo(Variant::INT, "memory/limits/command_queue/multithreading_queue_size_kb", PROPERTY_HINT_RANGE, "1,4096,1,or_greater"));
+	command_mem_size *= 1024;
+	command_mem = (uint8_t *)memalloc(command_mem_size);
 	if (p_sync) {
 		sync = memnew(Semaphore);
 	}

+ 25 - 11
core/command_queue_mt.h

@@ -330,15 +330,15 @@ class CommandQueueMT {
 	/***** BASE *******/
 
 	enum {
-		COMMAND_MEM_SIZE_KB = 256,
-		COMMAND_MEM_SIZE = COMMAND_MEM_SIZE_KB * 1024,
+		DEFAULT_COMMAND_MEM_SIZE_KB = 256,
 		SYNC_SEMAPHORES = 8
 	};
 
-	uint8_t *command_mem = (uint8_t *)memalloc(COMMAND_MEM_SIZE);
-	uint32_t read_ptr = 0;
-	uint32_t write_ptr = 0;
+	uint8_t *command_mem = nullptr;
+	uint32_t read_ptr_and_epoch = 0;
+	uint32_t write_ptr_and_epoch = 0;
 	uint32_t dealloc_ptr = 0;
+	uint32_t command_mem_size = 0;
 	SyncSemaphore sync_sems[SYNC_SEMAPHORES];
 	Mutex mutex;
 	Semaphore *sync = nullptr;
@@ -348,7 +348,11 @@ class CommandQueueMT {
 		// alloc size is size+T+safeguard
 		uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1)) + 8;
 
+		// Assert that the buffer is big enough to hold at least two messages.
+		ERR_FAIL_COND_V(alloc_size * 2 + sizeof(uint32_t) > command_mem_size, nullptr);
+
 	tryagain:
+		uint32_t write_ptr = write_ptr_and_epoch >> 1;
 
 		if (write_ptr < dealloc_ptr) {
 			// behind dealloc_ptr, check that there is room
@@ -362,7 +366,7 @@ class CommandQueueMT {
 		} else {
 			// ahead of dealloc_ptr, check that there is room
 
-			if ((COMMAND_MEM_SIZE - write_ptr) < alloc_size + sizeof(uint32_t)) {
+			if ((command_mem_size - write_ptr) < alloc_size + sizeof(uint32_t)) {
 				// no room at the end, wrap down;
 
 				if (dealloc_ptr == 0) { // don't want write_ptr to become dealloc_ptr
@@ -375,12 +379,17 @@ class CommandQueueMT {
 				}
 
 				// if this happens, it's a bug
-				ERR_FAIL_COND_V((COMMAND_MEM_SIZE - write_ptr) < 8, nullptr);
+				ERR_FAIL_COND_V((command_mem_size - write_ptr) < 8, nullptr);
 				// zero means, wrap to beginning
 
 				uint32_t *p = (uint32_t *)&command_mem[write_ptr];
-				*p = 0;
-				write_ptr = 0;
+				*p = 1;
+				write_ptr_and_epoch = 0 | (1 & ~write_ptr_and_epoch); // Invert epoch.
+				// See if we can get the thread to run and clear up some more space while we wait.
+				// This is required if alloc_size * 2 + 4 > COMMAND_MEM_SIZE
+				if (sync) {
+					sync->post();
+				}
 				goto tryagain;
 			}
 		}
@@ -394,6 +403,7 @@ class CommandQueueMT {
 		// allocate the command
 		T *cmd = memnew_placement(&command_mem[write_ptr], T);
 		write_ptr += size;
+		write_ptr_and_epoch = (write_ptr << 1) | (write_ptr_and_epoch & 1);
 		return cmd;
 	}
 
@@ -419,19 +429,21 @@ class CommandQueueMT {
 	tryagain:
 
 		// tried to read an empty queue
-		if (read_ptr == write_ptr) {
+		if (read_ptr_and_epoch == write_ptr_and_epoch) {
 			if (p_lock) {
 				unlock();
 			}
 			return false;
 		}
 
+		uint32_t read_ptr = read_ptr_and_epoch >> 1;
 		uint32_t size_ptr = read_ptr;
 		uint32_t size = *(uint32_t *)&command_mem[read_ptr] >> 1;
 
 		if (size == 0) {
+			*(uint32_t *)&command_mem[read_ptr] = 0; // clear in-use bit.
 			//end of ringbuffer, wrap
-			read_ptr = 0;
+			read_ptr_and_epoch = 0 | (1 & ~read_ptr_and_epoch); // Invert epoch.
 			goto tryagain;
 		}
 
@@ -441,6 +453,8 @@ class CommandQueueMT {
 
 		read_ptr += size;
 
+		read_ptr_and_epoch = (read_ptr << 1) | (read_ptr_and_epoch & 1);
+
 		if (p_lock) {
 			unlock();
 		}

+ 482 - 0
tests/test_command_queue.h

@@ -0,0 +1,482 @@
+/*************************************************************************/
+/*  test_command_queue.h                                                 */
+/*************************************************************************/
+/*                       This file is part of:                           */
+/*                           GODOT ENGINE                                */
+/*                      https://godotengine.org                          */
+/*************************************************************************/
+/* Copyright (c) 2007-2020 Juan Linietsky, Ariel Manzur.                 */
+/* Copyright (c) 2014-2020 Godot Engine contributors (cf. AUTHORS.md).   */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#ifndef TEST_COMMAND_QUEUE_H
+#define TEST_COMMAND_QUEUE_H
+
+#include "test_command_queue.h"
+
+#include "core/command_queue_mt.h"
+#include "core/os/mutex.h"
+#include "core/os/os.h"
+#include "core/os/semaphore.h"
+#include "core/os/thread.h"
+#include "core/project_settings.h"
+
+#if !defined(NO_THREADS)
+
+namespace TestCommandQueue {
+
+class ThreadWork {
+	Semaphore thread_sem;
+	Semaphore main_sem;
+	Mutex mut;
+	int threading_errors = 0;
+	enum State {
+		MAIN_START,
+		MAIN_DONE,
+		THREAD_START,
+		THREAD_DONE,
+	} state;
+
+public:
+	ThreadWork() {
+		mut.lock();
+		state = MAIN_START;
+	}
+	~ThreadWork() {
+		CHECK_MESSAGE(threading_errors == 0, "threads did not lock/unlock correctly");
+	}
+	void thread_wait_for_work() {
+		thread_sem.wait();
+		mut.lock();
+		if (state != MAIN_DONE) {
+			threading_errors++;
+		}
+		state = THREAD_START;
+	}
+	void thread_done_work() {
+		if (state != THREAD_START) {
+			threading_errors++;
+		}
+		state = THREAD_DONE;
+		mut.unlock();
+		main_sem.post();
+	}
+
+	void main_wait_for_done() {
+		main_sem.wait();
+		mut.lock();
+		if (state != THREAD_DONE) {
+			threading_errors++;
+		}
+		state = MAIN_START;
+	}
+	void main_start_work() {
+		if (state != MAIN_START) {
+			threading_errors++;
+		}
+		state = MAIN_DONE;
+		mut.unlock();
+		thread_sem.post();
+	}
+};
+
+class SharedThreadState {
+public:
+	ThreadWork reader_threadwork;
+	ThreadWork writer_threadwork;
+
+	CommandQueueMT command_queue = CommandQueueMT(true);
+
+	enum TestMsgType {
+		TEST_MSG_FUNC1_TRANSFORM,
+		TEST_MSG_FUNC2_TRANSFORM_FLOAT,
+		TEST_MSG_FUNC3_TRANSFORMx6,
+		TEST_MSGSYNC_FUNC1_TRANSFORM,
+		TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT,
+		TEST_MSGRET_FUNC1_TRANSFORM,
+		TEST_MSGRET_FUNC2_TRANSFORM_FLOAT,
+		TEST_MSG_MAX
+	};
+
+	Vector<TestMsgType> message_types_to_write;
+	bool during_writing = false;
+	int message_count_to_read = 0;
+	bool exit_threads = false;
+
+	Thread *reader_thread = nullptr;
+	Thread *writer_thread = nullptr;
+
+	int func1_count = 0;
+
+	void func1(Transform t) {
+		func1_count++;
+	}
+	void func2(Transform t, float f) {
+		func1_count++;
+	}
+	void func3(Transform t1, Transform t2, Transform t3, Transform t4, Transform t5, Transform t6) {
+		func1_count++;
+	}
+	Transform func1r(Transform t) {
+		func1_count++;
+		return t;
+	}
+	Transform func2r(Transform t, float f) {
+		func1_count++;
+		return t;
+	}
+
+	void add_msg_to_write(TestMsgType type) {
+		message_types_to_write.push_back(type);
+	}
+
+	void reader_thread_loop() {
+		reader_threadwork.thread_wait_for_work();
+		while (!exit_threads) {
+			if (message_count_to_read < 0) {
+				command_queue.flush_all();
+			}
+			for (int i = 0; i < message_count_to_read; i++) {
+				command_queue.wait_and_flush_one();
+			}
+			message_count_to_read = 0;
+
+			reader_threadwork.thread_done_work();
+			reader_threadwork.thread_wait_for_work();
+		}
+		command_queue.flush_all();
+		reader_threadwork.thread_done_work();
+	}
+	static void static_reader_thread_loop(void *stsvoid) {
+		SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);
+		sts->reader_thread_loop();
+	}
+
+	void writer_thread_loop() {
+		during_writing = false;
+		writer_threadwork.thread_wait_for_work();
+		while (!exit_threads) {
+			Transform tr;
+			Transform otr;
+			float f = 1;
+			during_writing = true;
+			for (int i = 0; i < message_types_to_write.size(); i++) {
+				TestMsgType msg_type = message_types_to_write[i];
+				switch (msg_type) {
+					case TEST_MSG_FUNC1_TRANSFORM:
+						command_queue.push(this, &SharedThreadState::func1, tr);
+						break;
+					case TEST_MSG_FUNC2_TRANSFORM_FLOAT:
+						command_queue.push(this, &SharedThreadState::func2, tr, f);
+						break;
+					case TEST_MSG_FUNC3_TRANSFORMx6:
+						command_queue.push(this, &SharedThreadState::func3, tr, tr, tr, tr, tr, tr);
+						break;
+					case TEST_MSGSYNC_FUNC1_TRANSFORM:
+						command_queue.push_and_sync(this, &SharedThreadState::func1, tr);
+						break;
+					case TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT:
+						command_queue.push_and_sync(this, &SharedThreadState::func2, tr, f);
+						break;
+					case TEST_MSGRET_FUNC1_TRANSFORM:
+						command_queue.push_and_ret(this, &SharedThreadState::func1r, tr, &otr);
+						break;
+					case TEST_MSGRET_FUNC2_TRANSFORM_FLOAT:
+						command_queue.push_and_ret(this, &SharedThreadState::func2r, tr, f, &otr);
+						break;
+					default:
+						break;
+				}
+			}
+			message_types_to_write.clear();
+			during_writing = false;
+
+			writer_threadwork.thread_done_work();
+			writer_threadwork.thread_wait_for_work();
+		}
+		writer_threadwork.thread_done_work();
+	}
+	static void static_writer_thread_loop(void *stsvoid) {
+		SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);
+		sts->writer_thread_loop();
+	}
+
+	void init_threads() {
+		reader_thread = Thread::create(&SharedThreadState::static_reader_thread_loop, this);
+		writer_thread = Thread::create(&SharedThreadState::static_writer_thread_loop, this);
+	}
+	void destroy_threads() {
+		exit_threads = true;
+		reader_threadwork.main_start_work();
+		writer_threadwork.main_start_work();
+
+		Thread::wait_to_finish(reader_thread);
+		memdelete(reader_thread);
+		reader_thread = nullptr;
+		Thread::wait_to_finish(writer_thread);
+		memdelete(writer_thread);
+		writer_thread = nullptr;
+	}
+};
+
+TEST_CASE("[CommandQueue] Test Queue Basics") {
+	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
+	SharedThreadState sts;
+	sts.init_threads();
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	sts.writer_threadwork.main_start_work();
+	sts.writer_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 0,
+			"Control: no messages read before reader has run.");
+
+	sts.message_count_to_read = 1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 1,
+			"Reader should have read one message");
+
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 1,
+			"Reader should have read no additional messages from flush_all");
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	sts.writer_threadwork.main_start_work();
+	sts.writer_threadwork.main_wait_for_done();
+
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 2,
+			"Reader should have read one additional message from flush_all");
+
+	sts.destroy_threads();
+
+	CHECK_MESSAGE(sts.func1_count == 2,
+			"Reader should have read no additional messages after join");
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
+			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
+}
+
+TEST_CASE("[CommandQueue] Test Waiting at Queue Full") {
+	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
+	SharedThreadState sts;
+	sts.init_threads();
+
+	int msgs_to_add = 24; // a queue of size 1kB fundamentally cannot fit 24 matrices.
+	for (int i = 0; i < msgs_to_add; i++) {
+		sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	}
+	sts.writer_threadwork.main_start_work();
+	// If we call main_wait_for_done, we will deadlock. So instead...
+	sts.message_count_to_read = 1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 1,
+			"Reader should have read one message");
+	CHECK_MESSAGE(sts.during_writing,
+			"Writer thread should still be blocked on writing.");
+	sts.message_count_to_read = msgs_to_add - 3;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count >= msgs_to_add - 3,
+			"Reader should have read most messages");
+	sts.writer_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.during_writing == false,
+			"Writer thread should no longer be blocked on writing.");
+	sts.message_count_to_read = 2;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == msgs_to_add,
+			"Reader should have read all messages");
+
+	sts.destroy_threads();
+
+	CHECK_MESSAGE(sts.func1_count == msgs_to_add,
+			"Reader should have read no additional messages after join");
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
+			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
+}
+
+TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {
+	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
+	SharedThreadState sts;
+	sts.init_threads();
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	sts.writer_threadwork.main_start_work();
+	sts.writer_threadwork.main_wait_for_done();
+
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 3,
+			"Reader should have read at least three messages");
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.writer_threadwork.main_start_work();
+	sts.writer_threadwork.main_wait_for_done();
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.writer_threadwork.main_start_work();
+	OS::get_singleton()->delay_usec(1000);
+
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	OS::get_singleton()->delay_usec(1000);
+
+	sts.writer_threadwork.main_wait_for_done();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count >= 3,
+			"Reader should have read at least three messages");
+
+	sts.message_count_to_read = 6 - sts.func1_count;
+	sts.reader_threadwork.main_start_work();
+
+	// The following will fail immediately.
+	// The reason it hangs indefinitely in engine, is all subsequent calls to
+	// CommandQueue.wait_and_flush_one will also fail.
+	sts.reader_threadwork.main_wait_for_done();
+
+	// Because looping around uses an extra message, easiest to consume all.
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 6,
+			"Reader should have read both message sets");
+
+	sts.destroy_threads();
+
+	CHECK_MESSAGE(sts.func1_count == 6,
+			"Reader should have read no additional messages after join");
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
+			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
+}
+
+TEST_CASE("[CommandQueue] Test Queue Lapping") {
+	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
+	SharedThreadState sts;
+	sts.init_threads();
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.writer_threadwork.main_start_work();
+	sts.writer_threadwork.main_wait_for_done();
+
+	// We need to read an extra message so that it triggers the dealloc logic once.
+	// Otherwise, the queue will be considered full.
+	sts.message_count_to_read = 3;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+	CHECK_MESSAGE(sts.func1_count == 3,
+			"Reader should have read first set of messages");
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);
+	sts.writer_threadwork.main_start_work();
+	// Don't wait for these, because the queue isn't big enough.
+	sts.writer_threadwork.main_wait_for_done();
+
+	sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC2_TRANSFORM_FLOAT);
+	sts.writer_threadwork.main_start_work();
+	OS::get_singleton()->delay_usec(1000);
+
+	sts.message_count_to_read = 3;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+
+	sts.writer_threadwork.main_wait_for_done();
+
+	sts.message_count_to_read = -1;
+	sts.reader_threadwork.main_start_work();
+	sts.reader_threadwork.main_wait_for_done();
+
+	CHECK_MESSAGE(sts.func1_count == 6,
+			"Reader should have read rest of the messages after lapping writers.");
+
+	sts.destroy_threads();
+
+	CHECK_MESSAGE(sts.func1_count == 6,
+			"Reader should have read no additional messages after join");
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
+			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
+}
+
+TEST_CASE("[Stress][CommandQueue] Stress test command queue") {
+	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
+	SharedThreadState sts;
+	sts.init_threads();
+
+	RandomNumberGenerator rng;
+
+	rng.set_seed(1837267);
+
+	int msgs_to_add = 2048;
+
+	for (int i = 0; i < msgs_to_add; i++) {
+		// randi_range is inclusive, so allow any enum value except MAX.
+		sts.add_msg_to_write((SharedThreadState::TestMsgType)rng.randi_range(0, SharedThreadState::TEST_MSG_MAX - 1));
+	}
+	sts.writer_threadwork.main_start_work();
+
+	int max_loop_iters = msgs_to_add * 2;
+	int loop_iters = 0;
+	while (sts.func1_count < msgs_to_add && loop_iters < max_loop_iters) {
+		int remaining = (msgs_to_add - sts.func1_count);
+		sts.message_count_to_read = rng.randi_range(1, remaining < 128 ? remaining : 128);
+		if (loop_iters % 3 == 0) {
+			sts.message_count_to_read = -1;
+		}
+		sts.reader_threadwork.main_start_work();
+		sts.reader_threadwork.main_wait_for_done();
+		loop_iters++;
+	}
+	CHECK_MESSAGE(loop_iters < max_loop_iters,
+			"Reader needed too many iterations to read messages!");
+	sts.writer_threadwork.main_wait_for_done();
+
+	sts.destroy_threads();
+
+	CHECK_MESSAGE(sts.func1_count == msgs_to_add,
+			"Reader should have read no additional messages after join");
+	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
+			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
+}
+
+} // namespace TestCommandQueue
+
+#endif // !defined(NO_THREADS)
+
+#endif // TEST_COMMAND_QUEUE_H

+ 1 - 0
tests/test_main.cpp

@@ -36,6 +36,7 @@
 #include "test_basis.h"
 #include "test_class_db.h"
 #include "test_color.h"
+#include "test_command_queue.h"
 #include "test_expression.h"
 #include "test_gradient.h"
 #include "test_gui.h"