Browse Source

Refactor CommandQueueMT

* RingBuffer had no reason to be in this context
* A single buffer is used that can grow as much as the game needs.

This should make thread loading entirely reliable.
reduz 4 years ago
parent
commit
c66b2651a6

+ 0 - 29
core/templates/command_queue_mt.cpp

@@ -70,35 +70,7 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
 	return &sync_sems[idx];
 }
 
-bool CommandQueueMT::dealloc_one() {
-tryagain:
-	if (dealloc_ptr == (write_ptr_and_epoch >> 1)) {
-		// The queue is empty
-		return false;
-	}
-
-	uint32_t size = *(uint32_t *)&command_mem[dealloc_ptr];
-
-	if (size == 0) {
-		// End of command buffer wrap down
-		dealloc_ptr = 0;
-		goto tryagain;
-	}
-
-	if (size & 1) {
-		// Still used, nothing can be deallocated
-		return false;
-	}
-
-	dealloc_ptr += (size >> 1) + 8;
-	return true;
-}
-
 CommandQueueMT::CommandQueueMT(bool p_sync) {
-	command_mem_size = GLOBAL_DEF_RST("memory/limits/command_queue/multithreading_queue_size_kb", DEFAULT_COMMAND_MEM_SIZE_KB);
-	ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/command_queue/multithreading_queue_size_kb", PropertyInfo(Variant::INT, "memory/limits/command_queue/multithreading_queue_size_kb", PROPERTY_HINT_RANGE, "1,4096,1,or_greater"));
-	command_mem_size *= 1024;
-	command_mem = (uint8_t *)memalloc(command_mem_size);
 	if (p_sync) {
 		sync = memnew(Semaphore);
 	}
@@ -108,5 +80,4 @@ CommandQueueMT::~CommandQueueMT() {
 	if (sync) {
 		memdelete(sync);
 	}
-	memfree(command_mem);
 }

+ 32 - 129
core/templates/command_queue_mt.h

@@ -34,6 +34,8 @@
 #include "core/os/memory.h"
 #include "core/os/mutex.h"
 #include "core/os/semaphore.h"
+#include "core/string/print_string.h"
+#include "core/templates/local_vector.h"
 #include "core/templates/simple_type.h"
 #include "core/typedefs.h"
 
@@ -334,11 +336,7 @@ class CommandQueueMT {
 		SYNC_SEMAPHORES = 8
 	};
 
-	uint8_t *command_mem = nullptr;
-	uint32_t read_ptr_and_epoch = 0;
-	uint32_t write_ptr_and_epoch = 0;
-	uint32_t dealloc_ptr = 0;
-	uint32_t command_mem_size = 0;
+	LocalVector<uint8_t> command_mem;
 	SyncSemaphore sync_sems[SYNC_SEMAPHORES];
 	Mutex mutex;
 	Semaphore *sync = nullptr;
@@ -346,138 +344,47 @@ class CommandQueueMT {
 	template <class T>
 	T *allocate() {
 		// alloc size is size+T+safeguard
-		uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1)) + 8;
-
-		// Assert that the buffer is big enough to hold at least two messages.
-		ERR_FAIL_COND_V(alloc_size * 2 + sizeof(uint32_t) > command_mem_size, nullptr);
-
-	tryagain:
-		uint32_t write_ptr = write_ptr_and_epoch >> 1;
-
-		if (write_ptr < dealloc_ptr) {
-			// behind dealloc_ptr, check that there is room
-			if ((dealloc_ptr - write_ptr) <= alloc_size) {
-				// There is no more room, try to deallocate something
-				if (dealloc_one()) {
-					goto tryagain;
-				}
-				return nullptr;
-			}
-		} else {
-			// ahead of dealloc_ptr, check that there is room
-
-			if ((command_mem_size - write_ptr) < alloc_size + sizeof(uint32_t)) {
-				// no room at the end, wrap down;
-
-				if (dealloc_ptr == 0) { // don't want write_ptr to become dealloc_ptr
-
-					// There is no more room, try to deallocate something
-					if (dealloc_one()) {
-						goto tryagain;
-					}
-					return nullptr;
-				}
-
-				// if this happens, it's a bug
-				ERR_FAIL_COND_V((command_mem_size - write_ptr) < 8, nullptr);
-				// zero means, wrap to beginning
-
-				uint32_t *p = (uint32_t *)&command_mem[write_ptr];
-				*p = 1;
-				write_ptr_and_epoch = 0 | (1 & ~write_ptr_and_epoch); // Invert epoch.
-				// See if we can get the thread to run and clear up some more space while we wait.
-				// This is required if alloc_size * 2 + 4 > COMMAND_MEM_SIZE
-				if (sync) {
-					sync->post();
-				}
-				goto tryagain;
-			}
-		}
-		// Allocate the size and the 'in use' bit.
-		// First bit used to mark if command is still in use (1)
-		// or if it has been destroyed and can be deallocated (0).
-		uint32_t size = (sizeof(T) + 8 - 1) & ~(8 - 1);
-		uint32_t *p = (uint32_t *)&command_mem[write_ptr];
-		*p = (size << 1) | 1;
-		write_ptr += 8;
-		// allocate the command
-		T *cmd = memnew_placement(&command_mem[write_ptr], T);
-		write_ptr += size;
-		write_ptr_and_epoch = (write_ptr << 1) | (write_ptr_and_epoch & 1);
+		uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1));
+		uint64_t size = command_mem.size();
+		command_mem.resize(size + alloc_size + 8);
+		*(uint64_t *)&command_mem[size] = alloc_size;
+		T *cmd = memnew_placement(&command_mem[size + 8], T);
 		return cmd;
 	}
 
 	template <class T>
 	T *allocate_and_lock() {
 		lock();
-		T *ret;
-
-		while ((ret = allocate<T>()) == nullptr) {
-			unlock();
-			// sleep a little until fetch happened and some room is made
-			wait_for_flush();
-			lock();
-		}
-
+		T *ret = allocate<T>();
 		return ret;
 	}
 
-	bool flush_one(bool p_lock = true) {
-		if (p_lock) {
-			lock();
-		}
-	tryagain:
-
-		// tried to read an empty queue
-		if (read_ptr_and_epoch == write_ptr_and_epoch) {
-			if (p_lock) {
-				unlock();
-			}
-			return false;
-		}
-
-		uint32_t read_ptr = read_ptr_and_epoch >> 1;
-		uint32_t size_ptr = read_ptr;
-		uint32_t size = *(uint32_t *)&command_mem[read_ptr] >> 1;
-
-		if (size == 0) {
-			*(uint32_t *)&command_mem[read_ptr] = 0; // clear in-use bit.
-			//end of ringbuffer, wrap
-			read_ptr_and_epoch = 0 | (1 & ~read_ptr_and_epoch); // Invert epoch.
-			goto tryagain;
-		}
-
-		read_ptr += 8;
+	void _flush() {
+		lock();
 
-		CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]);
+		uint64_t read_ptr = 0;
+		uint64_t limit = command_mem.size();
 
-		read_ptr += size;
+		while (read_ptr < limit) {
+			uint64_t size = *(uint64_t *)&command_mem[read_ptr];
+			read_ptr += 8;
+			CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]);
 
-		read_ptr_and_epoch = (read_ptr << 1) | (read_ptr_and_epoch & 1);
+			cmd->call(); //execute the function
+			cmd->post(); //release in case it needs sync/ret
+			cmd->~CommandBase(); //should be done, so erase the command
 
-		if (p_lock) {
-			unlock();
-		}
-		cmd->call();
-		if (p_lock) {
-			lock();
+			read_ptr += size;
 		}
 
-		cmd->post();
-		cmd->~CommandBase();
-		*(uint32_t *)&command_mem[size_ptr] &= ~1;
-
-		if (p_lock) {
-			unlock();
-		}
-		return true;
+		command_mem.clear();
+		unlock();
 	}
 
 	void lock();
 	void unlock();
 	void wait_for_flush();
 	SyncSemaphore *_alloc_sync_sem();
-	bool dealloc_one();
 
 public:
 	/* NORMAL PUSH COMMANDS */
@@ -492,23 +399,19 @@ public:
 	DECL_PUSH_AND_SYNC(0)
 	SPACE_SEP_LIST(DECL_PUSH_AND_SYNC, 15)
 
-	void wait_and_flush_one() {
-		ERR_FAIL_COND(!sync);
-		sync->wait();
-		flush_one();
-	}
-
 	_FORCE_INLINE_ void flush_if_pending() {
-		if (unlikely(read_ptr_and_epoch != write_ptr_and_epoch)) {
-			flush_all();
+		if (unlikely(command_mem.size() > 0)) {
+			_flush();
 		}
 	}
 	void flush_all() {
-		//ERR_FAIL_COND(sync);
-		lock();
-		while (flush_one(false)) {
-		}
-		unlock();
+		_flush();
+	}
+
+	void wait_and_flush() {
+		ERR_FAIL_COND(!sync);
+		sync->wait();
+		_flush();
 	}
 
 	CommandQueueMT(bool p_sync);

+ 0 - 2
doc/classes/ProjectSettings.xml

@@ -1139,8 +1139,6 @@
 		<member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;">
 			Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9".
 		</member>
-		<member name="memory/limits/command_queue/multithreading_queue_size_kb" type="int" setter="" getter="" default="256">
-		</member>
 		<member name="memory/limits/message_queue/max_size_kb" type="int" setter="" getter="" default="4096">
 			Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.
 		</member>

+ 1 - 1
servers/physics_2d/physics_server_2d_wrap_mt.cpp

@@ -56,7 +56,7 @@ void PhysicsServer2DWrapMT::thread_loop() {
 	step_thread_up.set();
 	while (!exit.is_set()) {
 		// flush commands one by one, until exit is requested
-		command_queue.wait_and_flush_one();
+		command_queue.wait_and_flush();
 	}
 
 	command_queue.flush_all(); // flush all

+ 1 - 1
servers/physics_3d/physics_server_3d_wrap_mt.cpp

@@ -56,7 +56,7 @@ void PhysicsServer3DWrapMT::thread_loop() {
 	step_thread_up = true;
 	while (!exit) {
 		// flush commands one by one, until exit is requested
-		command_queue.wait_and_flush_one();
+		command_queue.wait_and_flush();
 	}
 
 	command_queue.flush_all(); // flush all

+ 1 - 1
servers/rendering/rendering_server_default.cpp

@@ -358,7 +358,7 @@ void RenderingServerDefault::_thread_loop() {
 	draw_thread_up.set();
 	while (!exit.is_set()) {
 		// flush commands one by one, until exit is requested
-		command_queue.wait_and_flush_one();
+		command_queue.wait_and_flush();
 	}
 
 	command_queue.flush_all(); // flush all

+ 1 - 45
tests/test_command_queue.h

@@ -156,7 +156,7 @@ public:
 				command_queue.flush_all();
 			}
 			for (int i = 0; i < message_count_to_read; i++) {
-				command_queue.wait_and_flush_one();
+				command_queue.wait_and_flush();
 			}
 			message_count_to_read = 0;
 
@@ -276,50 +276,6 @@ TEST_CASE("[CommandQueue] Test Queue Basics") {
 			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
 }
 
-TEST_CASE("[CommandQueue] Test Waiting at Queue Full") {
-	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
-	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
-	SharedThreadState sts;
-	sts.init_threads();
-
-	int msgs_to_add = 24; // a queue of size 1kB fundamentally cannot fit 24 matrices.
-	for (int i = 0; i < msgs_to_add; i++) {
-		sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
-	}
-	sts.writer_threadwork.main_start_work();
-	// If we call main_wait_for_done, we will deadlock. So instead...
-	sts.message_count_to_read = 1;
-	sts.reader_threadwork.main_start_work();
-	sts.reader_threadwork.main_wait_for_done();
-	CHECK_MESSAGE(sts.func1_count == 1,
-			"Reader should have read one message");
-	CHECK_MESSAGE(sts.during_writing,
-			"Writer thread should still be blocked on writing.");
-	sts.message_count_to_read = msgs_to_add - 3;
-	sts.reader_threadwork.main_start_work();
-	sts.reader_threadwork.main_wait_for_done();
-	CHECK_MESSAGE(sts.func1_count >= msgs_to_add - 3,
-			"Reader should have read most messages");
-	sts.writer_threadwork.main_wait_for_done();
-	CHECK_MESSAGE(sts.during_writing == false,
-			"Writer thread should no longer be blocked on writing.");
-	sts.message_count_to_read = 2;
-	sts.reader_threadwork.main_start_work();
-	sts.reader_threadwork.main_wait_for_done();
-	sts.message_count_to_read = -1;
-	sts.reader_threadwork.main_start_work();
-	sts.reader_threadwork.main_wait_for_done();
-	CHECK_MESSAGE(sts.func1_count == msgs_to_add,
-			"Reader should have read all messages");
-
-	sts.destroy_threads();
-
-	CHECK_MESSAGE(sts.func1_count == msgs_to_add,
-			"Reader should have read no additional messages after join");
-	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
-			ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
-}
-
 TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {
 	const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
 	ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);