Prechádzať zdrojové kódy

Refactor CallQueue flushing for clarity

Pedro J. Estébanez 2 rokov pred
rodič
commit
c85beb8106
2 zmenil súbory, kde vykonal 51 pridanie a 45 odobranie
  1. 49 45
      core/object/message_queue.cpp
  2. 2 0
      core/object/message_queue.h

+ 49 - 45
core/object/message_queue.cpp

@@ -222,62 +222,66 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args
 	}
 }
 
-Error CallQueue::flush() {
-	LOCK_MUTEX;
-
-	// Thread overrides are not meant to be flushed, but appended to the main one.
-	if (this == MessageQueue::thread_singleton) {
-		if (pages.size() == 0) {
-			return OK;
-		}
+Error CallQueue::_transfer_messages_to_main_queue() {
+	if (pages.size() == 0) {
+		return OK;
+	}
 
-		CallQueue *mq = MessageQueue::main_singleton;
-		DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.
-
-		mq->mutex.lock();
-
-		// Here we're transferring the data from this queue to the main one.
-		// However, it's very unlikely big amounts of messages will be queued here,
-		// so PagedArray/Pool would be overkill. Also, in most cases the data will fit
-		// an already existing page of the main queue.
-
-		// Let's see if our first (likely only) page fits the current target queue page.
-		uint32_t src_page = 0;
-		{
-			if (mq->pages_used) {
-				uint32_t dst_page = mq->pages_used - 1;
-				uint32_t dst_offset = mq->page_bytes[dst_page];
-				if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
-					memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
-					mq->page_bytes[dst_page] += page_bytes[0];
-					src_page++;
-				}
+	CallQueue *mq = MessageQueue::main_singleton;
+	DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.
+
+	mq->mutex.lock();
+
+	// Here we're transferring the data from this queue to the main one.
+	// However, it's very unlikely big amounts of messages will be queued here,
+	// so PagedArray/Pool would be overkill. Also, in most cases the data will fit
+	// an already existing page of the main queue.
+
+	// Let's see if our first (likely only) page fits the current target queue page.
+	uint32_t src_page = 0;
+	{
+		if (mq->pages_used) {
+			uint32_t dst_page = mq->pages_used - 1;
+			uint32_t dst_offset = mq->page_bytes[dst_page];
+			if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
+				memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
+				mq->page_bytes[dst_page] += page_bytes[0];
+				src_page++;
 			}
 		}
+	}
 
-		// Any other possibly existing source page needs to be added.
+	// Any other possibly existing source page needs to be added.
 
-		if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
-			ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
-			mq->statistics();
-			mq->mutex.unlock();
-			return ERR_OUT_OF_MEMORY;
-		}
+	if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
+		ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
+		mq->statistics();
+		mq->mutex.unlock();
+		return ERR_OUT_OF_MEMORY;
+	}
 
-		for (; src_page < pages_used; src_page++) {
-			mq->_add_page();
-			memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
-			mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
-		}
+	for (; src_page < pages_used; src_page++) {
+		mq->_add_page();
+		memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
+		mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
+	}
 
-		mq->mutex.unlock();
+	mq->mutex.unlock();
 
-		page_bytes[0] = 0;
-		pages_used = 1;
+	page_bytes[0] = 0;
+	pages_used = 1;
 
-		return OK;
+	return OK;
+}
+
+Error CallQueue::flush() {
+	// Thread overrides are not meant to be flushed, but appended to the main one.
+	if (unlikely(this == MessageQueue::thread_singleton)) {
+		return _transfer_messages_to_main_queue();
 	}
 
+	LOCK_MUTEX;
+
 	if (pages.size() == 0) {
 		// Never allocated
 		UNLOCK_MUTEX;

+ 2 - 0
core/object/message_queue.h

@@ -98,6 +98,8 @@ private:
 		}
 	}
 
+	Error _transfer_messages_to_main_queue();
+
 	void _add_page();
 
 	void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);