Browse Source

Merge pull request #77229 from RandomShaper/fix_mt_issues

Fix message queue issues
Rémi Verschelde 2 years ago
parent
commit
9ad9820f5d
3 changed files with 39 additions and 3 deletions
  1. 3 1
      core/io/resource_loader.cpp
  2. 30 2
      core/object/message_queue.cpp
  3. 6 0
      core/object/message_queue.h

+ 3 - 1
core/io/resource_loader.cpp

@@ -309,6 +309,9 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
 	// --
 
 	Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress);
+	if (mq_override) {
+		mq_override->flush();
+	}
 
 	thread_load_mutex.lock();
 
@@ -354,7 +357,6 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
 
 	if (load_nesting == 0 && mq_override) {
 		memdelete(mq_override);
-		MessageQueue::set_thread_singleton_override(nullptr);
 	}
 }
 

+ 30 - 2
core/object/message_queue.cpp

@@ -35,10 +35,22 @@
 #include "core/object/class_db.h"
 #include "core/object/script_language.h"
 
+#ifdef DEV_ENABLED
+// Includes sanity checks to ensure that a queue set as a thread singleton override
+// is only ever called from the thread it was set for.
+#define LOCK_MUTEX                                     \
+	if (this != MessageQueue::thread_singleton) {      \
+		DEV_ASSERT(!this->is_current_thread_override); \
+		mutex.lock();                                  \
+	} else {                                           \
+		DEV_ASSERT(this->is_current_thread_override);  \
+	}
+#else
 #define LOCK_MUTEX                                \
 	if (this != MessageQueue::thread_singleton) { \
 		mutex.lock();                             \
 	}
+#endif
 
 #define UNLOCK_MUTEX                              \
 	if (this != MessageQueue::thread_singleton) { \
@@ -213,8 +225,8 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args
 Error CallQueue::flush() {
 	LOCK_MUTEX;
 
-	// Non-main threads are not meant to be flushed, but appended to the main one.
-	if (this != MessageQueue::main_singleton) {
+	// Thread overrides are not meant to be flushed, but appended to the main one.
+	if (this == MessageQueue::thread_singleton) {
 		if (pages.size() == 0) {
 			return OK;
 		}
@@ -237,6 +249,7 @@ Error CallQueue::flush() {
 				uint32_t dst_offset = mq->page_bytes[dst_page];
 				if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
 					memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
+					mq->page_bytes[dst_page] += page_bytes[0];
 					src_page++;
 				}
 			}
@@ -520,6 +533,10 @@ CallQueue::~CallQueue() {
 	if (!allocator_is_custom) {
 		memdelete(allocator);
 	}
+	// This is done here to avoid a circular dependency between the sanity checks and the thread singleton pointer.
+	if (this == MessageQueue::thread_singleton) {
+		MessageQueue::thread_singleton = nullptr;
+	}
 }
 
 //////////////////////
@@ -528,7 +545,18 @@ CallQueue *MessageQueue::main_singleton = nullptr;
 thread_local CallQueue *MessageQueue::thread_singleton = nullptr;
 
 void MessageQueue::set_thread_singleton_override(CallQueue *p_thread_singleton) {
+	DEV_ASSERT(p_thread_singleton); // To unset the thread singleton, don't call this with nullptr, but just memfree() it.
+#ifdef DEV_ENABLED
+	if (thread_singleton) {
+		thread_singleton->is_current_thread_override = false;
+	}
+#endif
 	thread_singleton = p_thread_singleton;
+#ifdef DEV_ENABLED
+	if (thread_singleton) {
+		thread_singleton->is_current_thread_override = true;
+	}
+#endif
 }
 
 MessageQueue::MessageQueue() :

+ 6 - 0
core/object/message_queue.h

@@ -40,6 +40,8 @@
 class Object;
 
 class CallQueue {
+	friend class MessageQueue;
+
 public:
 	enum {
 		PAGE_SIZE_BYTES = 4096
@@ -75,6 +77,10 @@ private:
 	uint32_t pages_used = 0;
 	bool flushing = false;
 
+#ifdef DEV_ENABLED
+	bool is_current_thread_override = false;
+#endif
+
 	struct Message {
 		Callable callable;
 		int16_t type;