浏览代码

Merge pull request #112506 from RandomShaper/less_locky_cmd_queue

CommandQueueMT: Reduce contention + Fix race conditions
Thaddeus Crews 3 周之前
父节点
当前提交
0e182ee83d

+ 32 - 8
core/templates/command_queue_mt.h

@@ -39,6 +39,8 @@
 #include "core/typedefs.h"
 #include "core/typedefs.h"
 
 
 class CommandQueueMT {
 class CommandQueueMT {
+	static const size_t MAX_COMMAND_SIZE = 1024;
+
 	struct CommandBase {
 	struct CommandBase {
 		bool sync = false;
 		bool sync = false;
 		virtual void call() = 0;
 		virtual void call() = 0;
@@ -105,6 +107,7 @@ class CommandQueueMT {
 
 
 	static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
 	static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
 
 
+	bool unique_flusher = false;
 	BinaryMutex mutex;
 	BinaryMutex mutex;
 	LocalVector<uint8_t> command_mem;
 	LocalVector<uint8_t> command_mem;
 	ConditionVariable sync_cond_var;
 	ConditionVariable sync_cond_var;
@@ -154,29 +157,46 @@ class CommandQueueMT {
 	}
 	}
 
 
 	void _flush() {
 	void _flush() {
+		MutexLock lock(mutex);
+
 		if (unlikely(flush_read_ptr)) {
 		if (unlikely(flush_read_ptr)) {
 			// Re-entrant call.
 			// Re-entrant call.
 			return;
 			return;
 		}
 		}
 
 
-		MutexLock lock(mutex);
+		char cmd_backup[MAX_COMMAND_SIZE];
 
 
 		while (flush_read_ptr < command_mem.size()) {
 		while (flush_read_ptr < command_mem.size()) {
 			uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
 			uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
-			flush_read_ptr += 8;
+			flush_read_ptr += sizeof(uint64_t);
+
 			CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
 			CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
-			uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
-			cmd->call();
-			WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
+
+			// Protect against race condition between this thread
+			// during the call to the command and other threads potentially
+			// invalidating the pointer due to reallocs.
+			memcpy(cmd_backup, (char *)cmd, size);
+
+			if (unique_flusher) {
+				// A single thread will pump; the lock is only needed for the command queue itself.
+				lock.temp_unlock();
+				((CommandBase *)cmd_backup)->call();
+				lock.temp_relock();
+			} else {
+				// At least we can unlock during WTP operations.
+				uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
+				((CommandBase *)cmd_backup)->call();
+				WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
+			}
 
 
 			// Handle potential realloc due to the command and unlock allowance.
 			// Handle potential realloc due to the command and unlock allowance.
 			cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
 			cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
 
 
 			if (unlikely(cmd->sync)) {
 			if (unlikely(cmd->sync)) {
 				sync_head++;
 				sync_head++;
-				lock.~MutexLock(); // Give an opportunity to awaiters right away.
+				lock.temp_unlock(); // Give an opportunity to awaiters right away.
 				sync_cond_var.notify_all();
 				sync_cond_var.notify_all();
-				new (&lock) MutexLock(mutex);
+				lock.temp_relock();
 				// Handle potential realloc happened during unlock.
 				// Handle potential realloc happened during unlock.
 				cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
 				cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
 			}
 			}
@@ -210,6 +230,7 @@ public:
 	void push(T *p_instance, M p_method, Args &&...p_args) {
 	void push(T *p_instance, M p_method, Args &&...p_args) {
 		// Standard command, no sync.
 		// Standard command, no sync.
 		using CommandType = Command<T, M, false, Args...>;
 		using CommandType = Command<T, M, false, Args...>;
+		static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
 		_push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
 		_push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
 	}
 	}
 
 
@@ -217,6 +238,7 @@ public:
 	void push_and_sync(T *p_instance, M p_method, Args... p_args) {
 	void push_and_sync(T *p_instance, M p_method, Args... p_args) {
 		// Standard command, sync.
 		// Standard command, sync.
 		using CommandType = Command<T, M, true, Args...>;
 		using CommandType = Command<T, M, true, Args...>;
+		static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
 		_push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
 		_push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
 	}
 	}
 
 
@@ -224,6 +246,7 @@ public:
 	void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
 	void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
 		// Command with return value, sync.
 		// Command with return value, sync.
 		using CommandType = CommandRet<T, M, R, Args...>;
 		using CommandType = CommandRet<T, M, R, Args...>;
+		static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
 		_push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
 		_push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
 	}
 	}
 
 
@@ -252,7 +275,8 @@ public:
 		pump_task_id = p_task_id;
 		pump_task_id = p_task_id;
 	}
 	}
 
 
-	CommandQueueMT() {
+	CommandQueueMT(bool p_unique_flusher = false) :
+			unique_flusher(p_unique_flusher) {
 		command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024);
 		command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024);
 	}
 	}
 };
 };

+ 1 - 1
modules/betsy/image_compress_betsy.h

@@ -103,7 +103,7 @@ Error _betsy_compress_s3tc(Image *r_img, Image::UsedChannels p_channels);
 class BetsyCompressor : public Object {
 class BetsyCompressor : public Object {
 	GDSOFTCLASS(BetsyCompressor, Object);
 	GDSOFTCLASS(BetsyCompressor, Object);
 
 
-	mutable CommandQueueMT command_queue;
+	mutable CommandQueueMT command_queue = CommandQueueMT(true);
 	bool exit = false;
 	bool exit = false;
 	WorkerThreadPool::TaskID task_id = WorkerThreadPool::INVALID_TASK_ID;
 	WorkerThreadPool::TaskID task_id = WorkerThreadPool::INVALID_TASK_ID;
 
 

+ 1 - 1
servers/rendering/rendering_server_default.h

@@ -74,7 +74,7 @@ class RenderingServerDefault : public RenderingServer {
 	uint64_t print_frame_profile_ticks_from = 0;
 	uint64_t print_frame_profile_ticks_from = 0;
 	uint32_t print_frame_profile_frame_count = 0;
 	uint32_t print_frame_profile_frame_count = 0;
 
 
-	mutable CommandQueueMT command_queue;
+	mutable CommandQueueMT command_queue = CommandQueueMT(true);
 
 
 	Thread::ID server_thread = Thread::MAIN_ID;
 	Thread::ID server_thread = Thread::MAIN_ID;
 	WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID;
 	WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID;