Browse Source

Avoid sync issues in resources with deferred updates

Pedro J. Estébanez 2 years ago
parent
commit
5a4613f551
3 changed files with 129 additions and 59 deletions
  1. 10 0
      core/io/resource_loader.cpp
  2. 110 54
      core/object/message_queue.cpp
  3. 9 5
      core/object/message_queue.h

+ 10 - 0
core/io/resource_loader.cpp

@@ -285,10 +285,15 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
 void ResourceLoader::_thread_load_function(void *p_userdata) {
 void ResourceLoader::_thread_load_function(void *p_userdata) {
 	ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
 	ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
 	// Thread-safe either if it's the current thread or a brand new one.
 	// Thread-safe either if it's the current thread or a brand new one.
+	CallQueue *mq_override = nullptr;
 	if (load_task.first_in_stack) {
 	if (load_task.first_in_stack) {
 		if (!load_task.dependent_path.is_empty()) {
 		if (!load_task.dependent_path.is_empty()) {
 			load_paths_stack.push_back(load_task.dependent_path);
 			load_paths_stack.push_back(load_task.dependent_path);
 		}
 		}
+		if (!Thread::is_main_thread()) {
+			mq_override = memnew(CallQueue);
+			MessageQueue::set_thread_singleton_override(mq_override);
+		}
 	} else {
 	} else {
 		DEV_ASSERT(load_task.dependent_path.is_empty());
 		DEV_ASSERT(load_task.dependent_path.is_empty());
 	}
 	}
@@ -346,6 +351,11 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
 	print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
 	print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
 
 
 	thread_load_mutex.unlock();
 	thread_load_mutex.unlock();
+
+	if (load_task.first_in_stack && mq_override) {
+		memdelete(mq_override);
+		MessageQueue::set_thread_singleton_override(nullptr);
+	}
 }
 }
 
 
 static String _validate_local_path(const String &p_path) {
 static String _validate_local_path(const String &p_path) {

+ 110 - 54
core/object/message_queue.cpp

@@ -35,14 +35,23 @@
 #include "core/object/class_db.h"
 #include "core/object/class_db.h"
 #include "core/object/script_language.h"
 #include "core/object/script_language.h"
 
 
+#define LOCK_MUTEX                                \
+	if (this != MessageQueue::thread_singleton) { \
+		mutex.lock();                             \
+	}
+
+#define UNLOCK_MUTEX                              \
+	if (this != MessageQueue::thread_singleton) { \
+		mutex.unlock();                           \
+	}
+
 void CallQueue::_add_page() {
 void CallQueue::_add_page() {
-	if (pages_used == page_messages.size()) {
+	if (pages_used == page_bytes.size()) {
 		pages.push_back(allocator->alloc());
 		pages.push_back(allocator->alloc());
-		page_messages.push_back(0);
+		page_bytes.push_back(0);
 	}
 	}
-	page_messages[pages_used] = 0;
+	page_bytes[pages_used] = 0;
 	pages_used++;
 	pages_used++;
-	page_offset = 0;
 }
 }
 
 
 Error CallQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
 Error CallQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
@@ -66,15 +75,15 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar
 
 
 	ERR_FAIL_COND_V_MSG(room_needed > uint32_t(PAGE_SIZE_BYTES), ERR_INVALID_PARAMETER, "Message is too large to fit on a page (" + itos(PAGE_SIZE_BYTES) + " bytes), consider passing less arguments.");
 	ERR_FAIL_COND_V_MSG(room_needed > uint32_t(PAGE_SIZE_BYTES), ERR_INVALID_PARAMETER, "Message is too large to fit on a page (" + itos(PAGE_SIZE_BYTES) + " bytes), consider passing less arguments.");
 
 
-	mutex.lock();
+	LOCK_MUTEX;
 
 
 	_ensure_first_page();
 	_ensure_first_page();
 
 
-	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+	if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
 		if (pages_used == max_pages) {
 		if (pages_used == max_pages) {
 			ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. " + error_text);
 			ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. " + error_text);
 			statistics();
 			statistics();
-			mutex.unlock();
+			UNLOCK_MUTEX;
 			return ERR_OUT_OF_MEMORY;
 			return ERR_OUT_OF_MEMORY;
 		}
 		}
 		_add_page();
 		_add_page();
@@ -82,7 +91,7 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar
 
 
 	Page *page = pages[pages_used - 1];
 	Page *page = pages[pages_used - 1];
 
 
-	uint8_t *buffer_end = &page->data[page_offset];
+	uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]];
 
 
 	Message *msg = memnew_placement(buffer_end, Message);
 	Message *msg = memnew_placement(buffer_end, Message);
 	msg->args = p_argcount;
 	msg->args = p_argcount;
@@ -104,21 +113,20 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar
 		*v = *p_args[i];
 		*v = *p_args[i];
 	}
 	}
 
 
-	page_messages[pages_used - 1]++;
-	page_offset += room_needed;
+	page_bytes[pages_used - 1] += room_needed;
 
 
-	mutex.unlock();
+	UNLOCK_MUTEX;
 
 
 	return OK;
 	return OK;
 }
 }
 
 
 Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) {
 Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) {
-	mutex.lock();
+	LOCK_MUTEX;
 	uint32_t room_needed = sizeof(Message) + sizeof(Variant);
 	uint32_t room_needed = sizeof(Message) + sizeof(Variant);
 
 
 	_ensure_first_page();
 	_ensure_first_page();
 
 
-	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+	if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
 		if (pages_used == max_pages) {
 		if (pages_used == max_pages) {
 			String type;
 			String type;
 			if (ObjectDB::get_instance(p_id)) {
 			if (ObjectDB::get_instance(p_id)) {
@@ -127,14 +135,14 @@ Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant
 			ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
 			ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
 			statistics();
 			statistics();
 
 
-			mutex.unlock();
+			UNLOCK_MUTEX;
 			return ERR_OUT_OF_MEMORY;
 			return ERR_OUT_OF_MEMORY;
 		}
 		}
 		_add_page();
 		_add_page();
 	}
 	}
 
 
 	Page *page = pages[pages_used - 1];
 	Page *page = pages[pages_used - 1];
-	uint8_t *buffer_end = &page->data[page_offset];
+	uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]];
 
 
 	Message *msg = memnew_placement(buffer_end, Message);
 	Message *msg = memnew_placement(buffer_end, Message);
 	msg->args = 1;
 	msg->args = 1;
@@ -146,32 +154,31 @@ Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant
 	Variant *v = memnew_placement(buffer_end, Variant);
 	Variant *v = memnew_placement(buffer_end, Variant);
 	*v = p_value;
 	*v = p_value;
 
 
-	page_messages[pages_used - 1]++;
-	page_offset += room_needed;
-	mutex.unlock();
+	page_bytes[pages_used - 1] += room_needed;
+	UNLOCK_MUTEX;
 
 
 	return OK;
 	return OK;
 }
 }
 
 
 Error CallQueue::push_notification(ObjectID p_id, int p_notification) {
 Error CallQueue::push_notification(ObjectID p_id, int p_notification) {
 	ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER);
 	ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER);
-	mutex.lock();
+	LOCK_MUTEX;
 	uint32_t room_needed = sizeof(Message);
 	uint32_t room_needed = sizeof(Message);
 
 
 	_ensure_first_page();
 	_ensure_first_page();
 
 
-	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+	if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
 		if (pages_used == max_pages) {
 		if (pages_used == max_pages) {
 			ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
 			ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
 			statistics();
 			statistics();
-			mutex.unlock();
+			UNLOCK_MUTEX;
 			return ERR_OUT_OF_MEMORY;
 			return ERR_OUT_OF_MEMORY;
 		}
 		}
 		_add_page();
 		_add_page();
 	}
 	}
 
 
 	Page *page = pages[pages_used - 1];
 	Page *page = pages[pages_used - 1];
-	uint8_t *buffer_end = &page->data[page_offset];
+	uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]];
 
 
 	Message *msg = memnew_placement(buffer_end, Message);
 	Message *msg = memnew_placement(buffer_end, Message);
 
 
@@ -180,9 +187,8 @@ Error CallQueue::push_notification(ObjectID p_id, int p_notification) {
 	//msg->target;
 	//msg->target;
 	msg->notification = p_notification;
 	msg->notification = p_notification;
 
 
-	page_messages[pages_used - 1]++;
-	page_offset += room_needed;
-	mutex.unlock();
+	page_bytes[pages_used - 1] += room_needed;
+	UNLOCK_MUTEX;
 
 
 	return OK;
 	return OK;
 }
 }
@@ -205,26 +211,77 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args
 }
 }
 
 
 Error CallQueue::flush() {
 Error CallQueue::flush() {
-	mutex.lock();
+	LOCK_MUTEX;
+
+	// Non-main threads are not meant to be flushed, but appended to the main one.
+	if (this != MessageQueue::main_singleton) {
+		if (pages.size() == 0) {
+			return OK;
+		}
+
+		CallQueue *mq = MessageQueue::main_singleton;
+		DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.
+
+		mq->mutex.lock();
+
+		// Here we're transferring the data from this queue to the main one.
+		// However, it's very unlikely big amounts of messages will be queued here,
+		// so PagedArray/Pool would be overkill. Also, in most cases the data will fit
+		// an already existing page of the main queue.
+
+		// Let's see if our first (likely only) page fits the current target queue page.
+		uint32_t src_page = 0;
+		{
+			if (mq->pages_used) {
+				uint32_t dst_page = mq->pages_used - 1;
+				uint32_t dst_offset = mq->page_bytes[dst_page];
+				if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
+					memcpy(mq->pages[dst_page] + dst_offset, pages[0], page_bytes[0]);
+					src_page++;
+				}
+			}
+		}
+
+		// Any other possibly existing source page needs to be added.
+
+		if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
+			ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
+			mq->statistics();
+			mq->mutex.unlock();
+			return ERR_OUT_OF_MEMORY;
+		}
+
+		for (; src_page < pages_used; src_page++) {
+			mq->_add_page();
+			memcpy(mq->pages[mq->pages_used - 1], pages[src_page], page_bytes[src_page]);
+			mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
+		}
+
+		mq->mutex.unlock();
+
+		page_bytes[0] = 0;
+		pages_used = 1;
+
+		return OK;
+	}
 
 
 	if (pages.size() == 0) {
 	if (pages.size() == 0) {
 		// Never allocated
 		// Never allocated
-		mutex.unlock();
+		UNLOCK_MUTEX;
 		return OK; // Do nothing.
 		return OK; // Do nothing.
 	}
 	}
 
 
 	if (flushing) {
 	if (flushing) {
-		mutex.unlock();
+		UNLOCK_MUTEX;
 		return ERR_BUSY;
 		return ERR_BUSY;
 	}
 	}
 
 
 	flushing = true;
 	flushing = true;
 
 
 	uint32_t i = 0;
 	uint32_t i = 0;
-	uint32_t j = 0;
 	uint32_t offset = 0;
 	uint32_t offset = 0;
 
 
-	while (i < pages_used && j < page_messages[i]) {
+	while (i < pages_used && offset < page_bytes[i]) {
 		Page *page = pages[i];
 		Page *page = pages[i];
 
 
 		//lock on each iteration, so a call can re-add itself to the message queue
 		//lock on each iteration, so a call can re-add itself to the message queue
@@ -241,7 +298,7 @@ Error CallQueue::flush() {
 
 
 		Object *target = message->callable.get_object();
 		Object *target = message->callable.get_object();
 
 
-		mutex.unlock();
+		UNLOCK_MUTEX;
 
 
 		switch (message->type & FLAG_MASK) {
 		switch (message->type & FLAG_MASK) {
 			case TYPE_CALL: {
 			case TYPE_CALL: {
@@ -272,35 +329,32 @@ Error CallQueue::flush() {
 
 
 		message->~Message();
 		message->~Message();
 
 
-		mutex.lock();
-		j++;
-		if (j == page_messages[i]) {
-			j = 0;
+		LOCK_MUTEX;
+		if (offset == page_bytes[i]) {
 			i++;
 			i++;
 			offset = 0;
 			offset = 0;
 		}
 		}
 	}
 	}
 
 
-	page_messages[0] = 0;
-	page_offset = 0;
+	page_bytes[0] = 0;
 	pages_used = 1;
 	pages_used = 1;
 
 
 	flushing = false;
 	flushing = false;
-	mutex.unlock();
+	UNLOCK_MUTEX;
 	return OK;
 	return OK;
 }
 }
 
 
 void CallQueue::clear() {
 void CallQueue::clear() {
-	mutex.lock();
+	LOCK_MUTEX;
 
 
 	if (pages.size() == 0) {
 	if (pages.size() == 0) {
-		mutex.unlock();
+		UNLOCK_MUTEX;
 		return; // Nothing to clear.
 		return; // Nothing to clear.
 	}
 	}
 
 
 	for (uint32_t i = 0; i < pages_used; i++) {
 	for (uint32_t i = 0; i < pages_used; i++) {
 		uint32_t offset = 0;
 		uint32_t offset = 0;
-		for (uint32_t j = 0; j < page_messages[i]; j++) {
+		while (offset < page_bytes[i]) {
 			Page *page = pages[i];
 			Page *page = pages[i];
 
 
 			//lock on each iteration, so a call can re-add itself to the message queue
 			//lock on each iteration, so a call can re-add itself to the message queue
@@ -312,7 +366,6 @@ void CallQueue::clear() {
 				advance += sizeof(Variant) * message->args;
 				advance += sizeof(Variant) * message->args;
 			}
 			}
 
 
-			//pre-advance so this function is reentrant
 			offset += advance;
 			offset += advance;
 
 
 			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
 			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
@@ -327,14 +380,13 @@ void CallQueue::clear() {
 	}
 	}
 
 
 	pages_used = 1;
 	pages_used = 1;
-	page_offset = 0;
-	page_messages[0] = 0;
+	page_bytes[0] = 0;
 
 
-	mutex.unlock();
+	UNLOCK_MUTEX;
 }
 }
 
 
 void CallQueue::statistics() {
 void CallQueue::statistics() {
-	mutex.lock();
+	LOCK_MUTEX;
 	HashMap<StringName, int> set_count;
 	HashMap<StringName, int> set_count;
 	HashMap<int, int> notify_count;
 	HashMap<int, int> notify_count;
 	HashMap<Callable, int> call_count;
 	HashMap<Callable, int> call_count;
@@ -342,7 +394,7 @@ void CallQueue::statistics() {
 
 
 	for (uint32_t i = 0; i < pages_used; i++) {
 	for (uint32_t i = 0; i < pages_used; i++) {
 		uint32_t offset = 0;
 		uint32_t offset = 0;
-		for (uint32_t j = 0; j < page_messages[i]; j++) {
+		while (offset < page_bytes[i]) {
 			Page *page = pages[i];
 			Page *page = pages[i];
 
 
 			//lock on each iteration, so a call can re-add itself to the message queue
 			//lock on each iteration, so a call can re-add itself to the message queue
@@ -397,7 +449,6 @@ void CallQueue::statistics() {
 				null_count++;
 				null_count++;
 			}
 			}
 
 
-			//pre-advance so this function is reentrant
 			offset += advance;
 			offset += advance;
 
 
 			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
 			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
@@ -426,7 +477,7 @@ void CallQueue::statistics() {
 		print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value));
 		print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value));
 	}
 	}
 
 
-	mutex.unlock();
+	UNLOCK_MUTEX;
 }
 }
 
 
 bool CallQueue::is_flushing() const {
 bool CallQueue::is_flushing() const {
@@ -437,7 +488,7 @@ bool CallQueue::has_messages() const {
 	if (pages_used == 0) {
 	if (pages_used == 0) {
 		return false;
 		return false;
 	}
 	}
-	if (pages_used == 1 && page_messages[0] == 0) {
+	if (pages_used == 1 && page_bytes[0] == 0) {
 		return false;
 		return false;
 	}
 	}
 
 
@@ -473,16 +524,21 @@ CallQueue::~CallQueue() {
 
 
 //////////////////////
 //////////////////////
 
 
-MessageQueue *MessageQueue::singleton = nullptr;
+CallQueue *MessageQueue::main_singleton = nullptr;
+thread_local CallQueue *MessageQueue::thread_singleton = nullptr;
+
+void MessageQueue::set_thread_singleton_override(CallQueue *p_thread_singleton) {
+	thread_singleton = p_thread_singleton;
+}
 
 
 MessageQueue::MessageQueue() :
 MessageQueue::MessageQueue() :
 		CallQueue(nullptr,
 		CallQueue(nullptr,
 				int(GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "1,512,1,or_greater"), 32)) * 1024 * 1024 / PAGE_SIZE_BYTES,
 				int(GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "1,512,1,or_greater"), 32)) * 1024 * 1024 / PAGE_SIZE_BYTES,
 				"Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.") {
 				"Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.") {
-	ERR_FAIL_COND_MSG(singleton != nullptr, "A MessageQueue singleton already exists.");
-	singleton = this;
+	ERR_FAIL_COND_MSG(main_singleton != nullptr, "A MessageQueue singleton already exists.");
+	main_singleton = this;
 }
 }
 
 
 MessageQueue::~MessageQueue() {
 MessageQueue::~MessageQueue() {
-	singleton = nullptr;
+	main_singleton = nullptr;
 }
 }

+ 9 - 5
core/object/message_queue.h

@@ -70,10 +70,9 @@ private:
 	bool allocator_is_custom = false;
 	bool allocator_is_custom = false;
 
 
 	LocalVector<Page *> pages;
 	LocalVector<Page *> pages;
-	LocalVector<uint32_t> page_messages;
+	LocalVector<uint32_t> page_bytes;
 	uint32_t max_pages = 0;
 	uint32_t max_pages = 0;
 	uint32_t pages_used = 0;
 	uint32_t pages_used = 0;
-	uint32_t page_offset = 0;
 	bool flushing = false;
 	bool flushing = false;
 
 
 	struct Message {
 	struct Message {
@@ -88,7 +87,7 @@ private:
 	_FORCE_INLINE_ void _ensure_first_page() {
 	_FORCE_INLINE_ void _ensure_first_page() {
 		if (unlikely(pages.is_empty())) {
 		if (unlikely(pages.is_empty())) {
 			pages.push_back(allocator->alloc());
 			pages.push_back(allocator->alloc());
-			page_messages.push_back(0);
+			page_bytes.push_back(0);
 			pages_used = 1;
 			pages_used = 1;
 		}
 		}
 	}
 	}
@@ -153,10 +152,15 @@ public:
 };
 };
 
 
 class MessageQueue : public CallQueue {
 class MessageQueue : public CallQueue {
-	static MessageQueue *singleton;
+	static CallQueue *main_singleton;
+	static thread_local CallQueue *thread_singleton;
+	friend class CallQueue;
 
 
 public:
 public:
-	_FORCE_INLINE_ static MessageQueue *get_singleton() { return singleton; }
+	_FORCE_INLINE_ static CallQueue *get_singleton() { return thread_singleton ? thread_singleton : main_singleton; }
+
+	static void set_thread_singleton_override(CallQueue *p_thread_singleton);
+
 	MessageQueue();
 	MessageQueue();
 	~MessageQueue();
 	~MessageQueue();
 };
 };