Browse Source

Merge pull request #75940 from reduz/redone-message-queue

Redo of Message Queue
Rémi Verschelde 2 years ago
parent
commit
11798fa420
3 changed files with 344 additions and 188 deletions
  1. 296 169
      core/object/message_queue.cpp
  2. 47 18
      core/object/message_queue.h
  3. 1 1
      doc/classes/ProjectSettings.xml

+ 296 - 169
core/object/message_queue.cpp

@@ -35,94 +35,55 @@
 #include "core/object/class_db.h"
 #include "core/object/class_db.h"
 #include "core/object/script_language.h"
 #include "core/object/script_language.h"
 
 
-MessageQueue *MessageQueue::singleton = nullptr;
-
-MessageQueue *MessageQueue::get_singleton() {
-	return singleton;
-}
-
-Error MessageQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
-	return push_callablep(Callable(p_id, p_method), p_args, p_argcount, p_show_error);
-}
-
-Error MessageQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) {
-	_THREAD_SAFE_METHOD_
-
-	uint8_t room_needed = sizeof(Message) + sizeof(Variant);
-
-	if ((buffer_end + room_needed) >= buffer_size) {
-		String type;
-		if (ObjectDB::get_instance(p_id)) {
-			type = ObjectDB::get_instance(p_id)->get_class();
-		}
-		ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. Try increasing \"memory/limits/message_queue/max_size_kb\" in project settings.");
-		statistics();
-		return ERR_OUT_OF_MEMORY;
+void CallQueue::_add_page() {
+	if (pages_used == page_messages.size()) {
+		pages.push_back(allocator->alloc());
+		page_messages.push_back(0);
 	}
 	}
-
-	Message *msg = memnew_placement(&buffer[buffer_end], Message);
-	msg->args = 1;
-	msg->callable = Callable(p_id, p_prop);
-	msg->type = TYPE_SET;
-
-	buffer_end += sizeof(Message);
-
-	Variant *v = memnew_placement(&buffer[buffer_end], Variant);
-	buffer_end += sizeof(Variant);
-	*v = p_value;
-
-	return OK;
+	page_messages[pages_used] = 0;
+	pages_used++;
+	page_offset = 0;
 }
 }
 
 
-Error MessageQueue::push_notification(ObjectID p_id, int p_notification) {
-	_THREAD_SAFE_METHOD_
-
-	ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER);
-
-	uint8_t room_needed = sizeof(Message);
-
-	if ((buffer_end + room_needed) >= buffer_size) {
-		ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. Try increasing \"memory/limits/message_queue/max_size_kb\" in project settings.");
-		statistics();
-		return ERR_OUT_OF_MEMORY;
-	}
-
-	Message *msg = memnew_placement(&buffer[buffer_end], Message);
-
-	msg->type = TYPE_NOTIFICATION;
-	msg->callable = Callable(p_id, CoreStringNames::get_singleton()->notification); //name is meaningless but callable needs it
-	//msg->target;
-	msg->notification = p_notification;
-
-	buffer_end += sizeof(Message);
-
-	return OK;
+Error CallQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
+	return push_callablep(Callable(p_id, p_method), p_args, p_argcount, p_show_error);
 }
 }
 
 
-Error MessageQueue::push_callp(Object *p_object, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
+Error CallQueue::push_callp(Object *p_object, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) {
 	return push_callp(p_object->get_instance_id(), p_method, p_args, p_argcount, p_show_error);
 	return push_callp(p_object->get_instance_id(), p_method, p_args, p_argcount, p_show_error);
 }
 }
 
 
-Error MessageQueue::push_notification(Object *p_object, int p_notification) {
+Error CallQueue::push_notification(Object *p_object, int p_notification) {
 	return push_notification(p_object->get_instance_id(), p_notification);
 	return push_notification(p_object->get_instance_id(), p_notification);
 }
 }
 
 
-Error MessageQueue::push_set(Object *p_object, const StringName &p_prop, const Variant &p_value) {
+Error CallQueue::push_set(Object *p_object, const StringName &p_prop, const Variant &p_value) {
 	return push_set(p_object->get_instance_id(), p_prop, p_value);
 	return push_set(p_object->get_instance_id(), p_prop, p_value);
 }
 }
 
 
-Error MessageQueue::push_callablep(const Callable &p_callable, const Variant **p_args, int p_argcount, bool p_show_error) {
-	_THREAD_SAFE_METHOD_
+Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_args, int p_argcount, bool p_show_error) {
+	mutex.lock();
+	uint32_t room_needed = sizeof(Message) + sizeof(Variant) * p_argcount;
 
 
-	int room_needed = sizeof(Message) + sizeof(Variant) * p_argcount;
+	ERR_FAIL_COND_V_MSG(room_needed > uint32_t(PAGE_SIZE_BYTES), ERR_INVALID_PARAMETER, "Message is too large to fit on a page (" + itos(PAGE_SIZE_BYTES) + " bytes), consider passing less arguments.");
 
 
-	if ((buffer_end + room_needed) >= buffer_size) {
-		ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. Try increasing \"memory/limits/message_queue/max_size_kb\" in project settings.");
-		statistics();
-		return ERR_OUT_OF_MEMORY;
+	_ensure_first_page();
+
+	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+		if (room_needed > uint32_t(PAGE_SIZE_BYTES) || pages_used == max_pages) {
+			ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. " + error_text);
+			statistics();
+			mutex.unlock();
+			return ERR_OUT_OF_MEMORY;
+		}
+		_add_page();
 	}
 	}
 
 
-	Message *msg = memnew_placement(&buffer[buffer_end], Message);
+	Page *page = pages[pages_used - 1];
+
+	uint8_t *buffer_end = &page->data[page_offset];
+
+	Message *msg = memnew_placement(buffer_end, Message);
 	msg->args = p_argcount;
 	msg->args = p_argcount;
 	msg->callable = p_callable;
 	msg->callable = p_callable;
 	msg->type = TYPE_CALL;
 	msg->type = TYPE_CALL;
@@ -133,89 +94,95 @@ Error MessageQueue::push_callablep(const Callable &p_callable, const Variant **p
 	buffer_end += sizeof(Message);
 	buffer_end += sizeof(Message);
 
 
 	for (int i = 0; i < p_argcount; i++) {
 	for (int i = 0; i < p_argcount; i++) {
-		Variant *v = memnew_placement(&buffer[buffer_end], Variant);
+		Variant *v = memnew_placement(buffer_end, Variant);
 		buffer_end += sizeof(Variant);
 		buffer_end += sizeof(Variant);
 		*v = *p_args[i];
 		*v = *p_args[i];
 	}
 	}
 
 
+	page_messages[pages_used - 1]++;
+	page_offset += room_needed;
+
+	mutex.unlock();
+
 	return OK;
 	return OK;
 }
 }
 
 
-void MessageQueue::statistics() {
-	HashMap<StringName, int> set_count;
-	HashMap<int, int> notify_count;
-	HashMap<Callable, int> call_count;
-	int null_count = 0;
+Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) {
+	mutex.lock();
+	uint32_t room_needed = sizeof(Message) + sizeof(Variant);
 
 
-	uint32_t read_pos = 0;
-	while (read_pos < buffer_end) {
-		Message *message = (Message *)&buffer[read_pos];
+	_ensure_first_page();
 
 
-		Object *target = message->callable.get_object();
+	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+		if (pages_used == max_pages) {
+			String type;
+			if (ObjectDB::get_instance(p_id)) {
+				type = ObjectDB::get_instance(p_id)->get_class();
+			}
+			ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
+			statistics();
 
 
-		if (target != nullptr) {
-			switch (message->type & FLAG_MASK) {
-				case TYPE_CALL: {
-					if (!call_count.has(message->callable)) {
-						call_count[message->callable] = 0;
-					}
+			mutex.unlock();
+			return ERR_OUT_OF_MEMORY;
+		}
+		_add_page();
+	}
 
 
-					call_count[message->callable]++;
+	Page *page = pages[pages_used - 1];
+	uint8_t *buffer_end = &page->data[page_offset];
 
 
-				} break;
-				case TYPE_NOTIFICATION: {
-					if (!notify_count.has(message->notification)) {
-						notify_count[message->notification] = 0;
-					}
+	Message *msg = memnew_placement(buffer_end, Message);
+	msg->args = 1;
+	msg->callable = Callable(p_id, p_prop);
+	msg->type = TYPE_SET;
 
 
-					notify_count[message->notification]++;
+	buffer_end += sizeof(Message);
 
 
-				} break;
-				case TYPE_SET: {
-					StringName t = message->callable.get_method();
-					if (!set_count.has(t)) {
-						set_count[t] = 0;
-					}
+	Variant *v = memnew_placement(buffer_end, Variant);
+	*v = p_value;
 
 
-					set_count[t]++;
+	page_messages[pages_used - 1]++;
+	page_offset += room_needed;
+	mutex.unlock();
 
 
-				} break;
-			}
+	return OK;
+}
 
 
-		} else {
-			//object was deleted
-			print_line("Object was deleted while awaiting a callback");
+Error CallQueue::push_notification(ObjectID p_id, int p_notification) {
+	ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER);
+	mutex.lock();
+	uint32_t room_needed = sizeof(Message);
 
 
-			null_count++;
-		}
+	_ensure_first_page();
 
 
-		read_pos += sizeof(Message);
-		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
-			read_pos += sizeof(Variant) * message->args;
+	if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) {
+		if (pages_used == max_pages) {
+			ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text);
+			statistics();
+			mutex.unlock();
+			return ERR_OUT_OF_MEMORY;
 		}
 		}
+		_add_page();
 	}
 	}
 
 
-	print_line("TOTAL BYTES: " + itos(buffer_end));
-	print_line("NULL count: " + itos(null_count));
+	Page *page = pages[pages_used - 1];
+	uint8_t *buffer_end = &page->data[page_offset];
 
 
-	for (const KeyValue<StringName, int> &E : set_count) {
-		print_line("SET " + E.key + ": " + itos(E.value));
-	}
+	Message *msg = memnew_placement(buffer_end, Message);
 
 
-	for (const KeyValue<Callable, int> &E : call_count) {
-		print_line("CALL " + E.key + ": " + itos(E.value));
-	}
+	msg->type = TYPE_NOTIFICATION;
+	msg->callable = Callable(p_id, CoreStringNames::get_singleton()->notification); //name is meaningless but callable needs it
+	//msg->target;
+	msg->notification = p_notification;
 
 
-	for (const KeyValue<int, int> &E : notify_count) {
-		print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value));
-	}
-}
+	page_messages[pages_used - 1]++;
+	page_offset += room_needed;
+	mutex.unlock();
 
 
-int MessageQueue::get_max_buffer_usage() const {
-	return buffer_max_used;
+	return OK;
 }
 }
 
 
-void MessageQueue::_call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error) {
+void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error) {
 	const Variant **argptrs = nullptr;
 	const Variant **argptrs = nullptr;
 	if (p_argcount) {
 	if (p_argcount) {
 		argptrs = (const Variant **)alloca(sizeof(Variant *) * p_argcount);
 		argptrs = (const Variant **)alloca(sizeof(Variant *) * p_argcount);
@@ -232,26 +199,32 @@ void MessageQueue::_call_function(const Callable &p_callable, const Variant *p_a
 	}
 	}
 }
 }
 
 
-void MessageQueue::flush() {
-	if (buffer_end > buffer_max_used) {
-		buffer_max_used = buffer_end;
-	}
+Error CallQueue::flush() {
+	mutex.lock();
 
 
-	uint32_t read_pos = 0;
-
-	//using reverse locking strategy
-	_THREAD_SAFE_LOCK_
+	if (pages.size() == 0) {
+		// Never allocated
+		mutex.unlock();
+		return OK; // Do nothing.
+	}
 
 
 	if (flushing) {
 	if (flushing) {
-		_THREAD_SAFE_UNLOCK_
-		ERR_FAIL_COND(flushing); //already flushing, you did something odd
+		mutex.unlock();
+		return ERR_BUSY;
 	}
 	}
+
 	flushing = true;
 	flushing = true;
 
 
-	while (read_pos < buffer_end) {
+	uint32_t i = 0;
+	uint32_t j = 0;
+	uint32_t offset = 0;
+
+	while (i < pages_used && j < page_messages[i]) {
+		Page *page = pages[i];
+
 		//lock on each iteration, so a call can re-add itself to the message queue
 		//lock on each iteration, so a call can re-add itself to the message queue
 
 
-		Message *message = (Message *)&buffer[read_pos];
+		Message *message = (Message *)&page->data[offset];
 
 
 		uint32_t advance = sizeof(Message);
 		uint32_t advance = sizeof(Message);
 		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
 		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
@@ -259,12 +232,12 @@ void MessageQueue::flush() {
 		}
 		}
 
 
 		//pre-advance so this function is reentrant
 		//pre-advance so this function is reentrant
-		read_pos += advance;
-
-		_THREAD_SAFE_UNLOCK_
+		offset += advance;
 
 
 		Object *target = message->callable.get_object();
 		Object *target = message->callable.get_object();
 
 
+		mutex.unlock();
+
 		if (target != nullptr) {
 		if (target != nullptr) {
 			switch (message->type & FLAG_MASK) {
 			switch (message->type & FLAG_MASK) {
 				case TYPE_CALL: {
 				case TYPE_CALL: {
@@ -291,54 +264,208 @@ void MessageQueue::flush() {
 
 
 		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
 		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
 			Variant *args = (Variant *)(message + 1);
 			Variant *args = (Variant *)(message + 1);
-			for (int i = 0; i < message->args; i++) {
-				args[i].~Variant();
+			for (int k = 0; k < message->args; k++) {
+				args[k].~Variant();
 			}
 			}
 		}
 		}
 
 
 		message->~Message();
 		message->~Message();
 
 
-		_THREAD_SAFE_LOCK_
+		mutex.lock();
+		j++;
+		if (j == page_messages[i]) {
+			j = 0;
+			i++;
+			offset = 0;
+		}
 	}
 	}
 
 
-	buffer_end = 0; // reset buffer
+	page_messages[0] = 0;
+	page_offset = 0;
+	pages_used = 1;
+
 	flushing = false;
 	flushing = false;
-	_THREAD_SAFE_UNLOCK_
+	mutex.unlock();
+	return OK;
 }
 }
 
 
-bool MessageQueue::is_flushing() const {
-	return flushing;
-}
+void CallQueue::clear() {
+	mutex.lock();
 
 
-MessageQueue::MessageQueue() {
-	ERR_FAIL_COND_MSG(singleton != nullptr, "A MessageQueue singleton already exists.");
-	singleton = this;
+	if (pages.size() == 0) {
+		mutex.unlock();
+		return; // Nothing to clear.
+	}
 
 
-	buffer_size = GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_kb", PROPERTY_HINT_RANGE, "1024,4096,1,or_greater"), DEFAULT_QUEUE_SIZE_KB);
-	buffer_size *= 1024;
-	buffer = memnew_arr(uint8_t, buffer_size);
-}
+	for (uint32_t i = 0; i < pages_used; i++) {
+		uint32_t offset = 0;
+		for (uint32_t j = 0; j < page_messages[i]; j++) {
+			Page *page = pages[i];
 
 
-MessageQueue::~MessageQueue() {
-	uint32_t read_pos = 0;
+			//lock on each iteration, so a call can re-add itself to the message queue
 
 
-	while (read_pos < buffer_end) {
-		Message *message = (Message *)&buffer[read_pos];
-		Variant *args = (Variant *)(message + 1);
-		int argc = message->args;
-		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
-			for (int i = 0; i < argc; i++) {
-				args[i].~Variant();
+			Message *message = (Message *)&page->data[offset];
+
+			uint32_t advance = sizeof(Message);
+			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
+				advance += sizeof(Variant) * message->args;
 			}
 			}
+
+			//pre-advance so this function is reentrant
+			offset += advance;
+
+			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
+				Variant *args = (Variant *)(message + 1);
+				for (int k = 0; k < message->args; k++) {
+					args[k].~Variant();
+				}
+			}
+
+			message->~Message();
 		}
 		}
-		message->~Message();
+	}
 
 
-		read_pos += sizeof(Message);
-		if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
-			read_pos += sizeof(Variant) * message->args;
+	pages_used = 1;
+	page_offset = 0;
+	page_messages[0] = 0;
+
+	mutex.unlock();
+}
+
+void CallQueue::statistics() {
+	mutex.lock();
+	HashMap<StringName, int> set_count;
+	HashMap<int, int> notify_count;
+	HashMap<Callable, int> call_count;
+	int null_count = 0;
+
+	for (uint32_t i = 0; i < pages_used; i++) {
+		uint32_t offset = 0;
+		for (uint32_t j = 0; j < page_messages[i]; j++) {
+			Page *page = pages[i];
+
+			//lock on each iteration, so a call can re-add itself to the message queue
+
+			Message *message = (Message *)&page->data[offset];
+
+			uint32_t advance = sizeof(Message);
+			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
+				advance += sizeof(Variant) * message->args;
+			}
+
+			Object *target = message->callable.get_object();
+
+			if (target != nullptr) {
+				switch (message->type & FLAG_MASK) {
+					case TYPE_CALL: {
+						if (!call_count.has(message->callable)) {
+							call_count[message->callable] = 0;
+						}
+
+						call_count[message->callable]++;
+
+					} break;
+					case TYPE_NOTIFICATION: {
+						if (!notify_count.has(message->notification)) {
+							notify_count[message->notification] = 0;
+						}
+
+						notify_count[message->notification]++;
+
+					} break;
+					case TYPE_SET: {
+						StringName t = message->callable.get_method();
+						if (!set_count.has(t)) {
+							set_count[t] = 0;
+						}
+
+						set_count[t]++;
+
+					} break;
+				}
+
+			} else {
+				//object was deleted
+				print_line("Object was deleted while awaiting a callback");
+
+				null_count++;
+			}
+
+			//pre-advance so this function is reentrant
+			offset += advance;
+
+			if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) {
+				Variant *args = (Variant *)(message + 1);
+				for (int k = 0; k < message->args; k++) {
+					args[k].~Variant();
+				}
+			}
+
+			message->~Message();
 		}
 		}
 	}
 	}
 
 
+	print_line("TOTAL PAGES: " + itos(pages_used) + " (" + itos(pages_used * PAGE_SIZE_BYTES) + " bytes).");
+	print_line("NULL count: " + itos(null_count));
+
+	for (const KeyValue<StringName, int> &E : set_count) {
+		print_line("SET " + E.key + ": " + itos(E.value));
+	}
+
+	for (const KeyValue<Callable, int> &E : call_count) {
+		print_line("CALL " + E.key + ": " + itos(E.value));
+	}
+
+	for (const KeyValue<int, int> &E : notify_count) {
+		print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value));
+	}
+
+	mutex.unlock();
+}
+
+bool CallQueue::is_flushing() const {
+	return flushing;
+}
+
+int CallQueue::get_max_buffer_usage() const {
+	return pages.size() * PAGE_SIZE_BYTES;
+}
+
+CallQueue::CallQueue(Allocator *p_custom_allocator, uint32_t p_max_pages, const String &p_error_text) {
+	if (p_custom_allocator) {
+		allocator = p_custom_allocator;
+		allocator_is_custom = true;
+	} else {
+		allocator = memnew(Allocator(16)); // 16 elements per allocator page, 64kb per allocator page. Anything small will do, though.
+		allocator_is_custom = false;
+	}
+	max_pages = p_max_pages;
+	error_text = p_error_text;
+}
+
+CallQueue::~CallQueue() {
+	clear();
+	// Let go of pages.
+	for (uint32_t i = 0; i < pages.size(); i++) {
+		allocator->free(pages[i]);
+	}
+	if (!allocator_is_custom) {
+		memdelete(allocator);
+	}
+}
+
+//////////////////////
+
+MessageQueue *MessageQueue::singleton = nullptr;
+
+MessageQueue::MessageQueue() :
+		CallQueue(nullptr,
+				int(GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "1,512,1,or_greater"), 32)) * 1024 * 1024 / PAGE_SIZE_BYTES,
+				"Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.") {
+	ERR_FAIL_COND_MSG(singleton != nullptr, "A MessageQueue singleton already exists.");
+	singleton = this;
+}
+
+MessageQueue::~MessageQueue() {
 	singleton = nullptr;
 	singleton = nullptr;
-	memdelete_arr(buffer);
 }
 }

+ 47 - 18
core/object/message_queue.h

@@ -33,26 +33,45 @@
 
 
 #include "core/object/object_id.h"
 #include "core/object/object_id.h"
 #include "core/os/thread_safe.h"
 #include "core/os/thread_safe.h"
+#include "core/templates/local_vector.h"
+#include "core/templates/paged_allocator.h"
 #include "core/variant/variant.h"
 #include "core/variant/variant.h"
 
 
 class Object;
 class Object;
 
 
-class MessageQueue {
-	_THREAD_SAFE_CLASS_
-
+class CallQueue {
+public:
 	enum {
 	enum {
-		DEFAULT_QUEUE_SIZE_KB = 4096
+		PAGE_SIZE_BYTES = 4096
 	};
 	};
 
 
+private:
 	enum {
 	enum {
 		TYPE_CALL,
 		TYPE_CALL,
 		TYPE_NOTIFICATION,
 		TYPE_NOTIFICATION,
 		TYPE_SET,
 		TYPE_SET,
+		TYPE_END, // End marker.
 		FLAG_SHOW_ERROR = 1 << 14,
 		FLAG_SHOW_ERROR = 1 << 14,
-		FLAG_MASK = FLAG_SHOW_ERROR - 1
+		FLAG_MASK = FLAG_SHOW_ERROR - 1,
+	};
 
 
+	struct Page {
+		uint8_t data[PAGE_SIZE_BYTES];
 	};
 	};
 
 
+	Mutex mutex;
+	typedef PagedAllocator<Page, true> Allocator;
+
+	Allocator *allocator = nullptr;
+	bool allocator_is_custom = false;
+
+	LocalVector<Page *> pages;
+	LocalVector<uint32_t> page_messages;
+	uint32_t max_pages = 0;
+	uint32_t pages_used = 0;
+	uint32_t page_offset = 0;
+	bool flushing = false;
+
 	struct Message {
 	struct Message {
 		Callable callable;
 		Callable callable;
 		int16_t type;
 		int16_t type;
@@ -62,20 +81,21 @@ class MessageQueue {
 		};
 		};
 	};
 	};
 
 
-	uint8_t *buffer = nullptr;
-	uint32_t buffer_end = 0;
-	uint32_t buffer_max_used = 0;
-	uint32_t buffer_size = 0;
+	_FORCE_INLINE_ void _ensure_first_page() {
+		if (unlikely(pages.is_empty())) {
+			pages.push_back(allocator->alloc());
+			page_messages.push_back(0);
+			pages_used = 1;
+		}
+	}
+
+	void _add_page();
 
 
 	void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);
 	void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);
 
 
-	static MessageQueue *singleton;
-
-	bool flushing = false;
+	String error_text;
 
 
 public:
 public:
-	static MessageQueue *get_singleton();
-
 	Error push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error = false);
 	Error push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error = false);
 	template <typename... VarArgs>
 	template <typename... VarArgs>
 	Error push_call(ObjectID p_id, const StringName &p_method, VarArgs... p_args) {
 	Error push_call(ObjectID p_id, const StringName &p_method, VarArgs... p_args) {
@@ -87,9 +107,9 @@ public:
 		return push_callp(p_id, p_method, sizeof...(p_args) == 0 ? nullptr : (const Variant **)argptrs, sizeof...(p_args));
 		return push_callp(p_id, p_method, sizeof...(p_args) == 0 ? nullptr : (const Variant **)argptrs, sizeof...(p_args));
 	}
 	}
 
 
-	Error push_notification(ObjectID p_id, int p_notification);
-	Error push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value);
 	Error push_callablep(const Callable &p_callable, const Variant **p_args, int p_argcount, bool p_show_error = false);
 	Error push_callablep(const Callable &p_callable, const Variant **p_args, int p_argcount, bool p_show_error = false);
+	Error push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value);
+	Error push_notification(ObjectID p_id, int p_notification);
 
 
 	template <typename... VarArgs>
 	template <typename... VarArgs>
 	Error push_callable(const Callable &p_callable, VarArgs... p_args) {
 	Error push_callable(const Callable &p_callable, VarArgs... p_args) {
@@ -115,13 +135,22 @@ public:
 	Error push_notification(Object *p_object, int p_notification);
 	Error push_notification(Object *p_object, int p_notification);
 	Error push_set(Object *p_object, const StringName &p_prop, const Variant &p_value);
 	Error push_set(Object *p_object, const StringName &p_prop, const Variant &p_value);
 
 
+	Error flush();
+	void clear();
 	void statistics();
 	void statistics();
-	void flush();
 
 
 	bool is_flushing() const;
 	bool is_flushing() const;
-
 	int get_max_buffer_usage() const;
 	int get_max_buffer_usage() const;
 
 
+	CallQueue(Allocator *p_custom_allocator = 0, uint32_t p_max_pages = 8192, const String &p_error_text = String());
+	virtual ~CallQueue();
+};
+
+class MessageQueue : public CallQueue {
+	static MessageQueue *singleton;
+
+public:
+	_FORCE_INLINE_ static MessageQueue *get_singleton() { return singleton; }
 	MessageQueue();
 	MessageQueue();
 	~MessageQueue();
 	~MessageQueue();
 };
 };

+ 1 - 1
doc/classes/ProjectSettings.xml

@@ -1739,7 +1739,7 @@
 		<member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;">
 		<member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;">
 			Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9".
 			Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9".
 		</member>
 		</member>
-		<member name="memory/limits/message_queue/max_size_kb" type="int" setter="" getter="" default="4096">
+		<member name="memory/limits/message_queue/max_size_mb" type="int" setter="" getter="" default="32">
 			Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.
 			Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.
 		</member>
 		</member>
 		<member name="memory/limits/multithreaded_server/rid_pool_prealloc" type="int" setter="" getter="" default="60">
 		<member name="memory/limits/multithreaded_server/rid_pool_prealloc" type="int" setter="" getter="" default="60">