Browse Source

Adding pause/resume support to the async loader

Panagiotis Christopoulos Charitos 9 years ago
parent
commit
a64ddebd49

+ 35 - 24
include/anki/resource/AsyncLoader.h

@@ -7,27 +7,36 @@
 
 #include <anki/resource/Common.h>
 #include <anki/util/Thread.h>
+#include <anki/util/List.h>
 
 namespace anki
 {
 
+// Forward
+class AsyncLoader;
+
 /// @addtogroup resource
 /// @{
 
-/// Loader asynchronous task.
-class AsyncLoaderTask
+class AsyncLoaderTaskContext
 {
-	friend class AsyncLoader;
+public:
+	/// Pause the async loader.
+	Bool m_pause = false;
+
+	/// Resubmit the same task at the end of the queue.
+	Bool m_resubmitTask = false;
+};
 
+/// Interface for tasks for the AsyncLoader.
+class AsyncLoaderTask : public IntrusiveListEnabled<AsyncLoaderTask>
+{
 public:
 	virtual ~AsyncLoaderTask()
 	{
 	}
 
-	virtual ANKI_USE_RESULT Error operator()() = 0;
-
-private:
-	AsyncLoaderTask* m_next = nullptr;
+	virtual ANKI_USE_RESULT Error operator()(AsyncLoaderTaskContext& ctx) = 0;
 };
 
 /// Asynchronous resource loader.
@@ -38,20 +47,31 @@ public:
 
 	~AsyncLoader();
 
-	ANKI_USE_RESULT Error create(const HeapAllocator<U8>& alloc);
+	void init(const HeapAllocator<U8>& alloc);
 
 	/// Create a new asynchronous loading task.
 	template<typename TTask, typename... TArgs>
 	void newTask(TArgs&&... args);
 
+	/// Pause the loader. This method will block the main thread for the current
+	/// async task to finish. The rest of the tasks in the queue will not be
+	/// executed until resume is called.
+	void pause();
+
+	/// Resume the async loading.
+	void resume();
+
 private:
 	HeapAllocator<U8> m_alloc;
 	Thread m_thread;
+	Barrier m_barrier = {2};
+
 	Mutex m_mtx;
 	ConditionVariable m_condVar;
-	AsyncLoaderTask* m_head = nullptr;
-	AsyncLoaderTask* m_tail = nullptr;
+	IntrusiveList<AsyncLoaderTask> m_taskQueue;
 	Bool8 m_quit = false;
+	Bool8 m_paused = false;
+	Bool8 m_sync = false;
 
 	/// Thread callback
 	static ANKI_USE_RESULT Error threadCallback(Thread::Info& info);
@@ -63,7 +83,7 @@ private:
 
 //==============================================================================
 template<typename TTask, typename... TArgs>
-void AsyncLoader::newTask(TArgs&&... args)
+inline void AsyncLoader::newTask(TArgs&&... args)
 {
 	TTask* newTask =
 		m_alloc.template newInstance<TTask>(std::forward<TArgs>(args)...);
@@ -71,22 +91,13 @@ void AsyncLoader::newTask(TArgs&&... args)
 	// Append task to the list
 	{
 		LockGuard<Mutex> lock(m_mtx);
-		if(m_tail != nullptr)
-		{
-			ANKI_ASSERT(m_tail->m_next == nullptr);
-			ANKI_ASSERT(m_head != nullptr);
+		m_taskQueue.pushBack(newTask);
 
-			m_tail->m_next = newTask;
-			m_tail = newTask;
-		}
-		else
+		if(!m_paused)
 		{
-			ANKI_ASSERT(m_head == nullptr);
-			m_head = m_tail = newTask;
+			// Wake up the thread if it's not paused
+			m_condVar.notifyOne();
 		}
-
-		// Wake up the thread
-		m_condVar.notifyOne();
 	}
 }
 

+ 8 - 0
src/core/App.cpp

@@ -21,6 +21,7 @@
 #include <anki/renderer/MainRenderer.h>
 #include <anki/script/ScriptManager.h>
 #include <anki/resource/ResourceFilesystem.h>
+#include <anki/resource/AsyncLoader.h>
 
 #if ANKI_OS == ANKI_OS_ANDROID
 #include <android_native_app_glue.h>
@@ -384,7 +385,14 @@ Error App::mainLoop()
 
 		ANKI_CHECK(m_renderer->render(*m_scene));
 
+		// Pause and sync async loader. That will force all tasks before the
+		// pause to finish in this frame.
+		m_resources->getAsyncLoader().pause();
+		
 		m_gr->swapBuffers();
+		
+		// Now resume the loader
+		m_resources->getAsyncLoader().resume();
 
 		// Sleep
 		timer.stop();

+ 71 - 28
src/resource/AsyncLoader.cpp

@@ -20,28 +20,24 @@ AsyncLoader::~AsyncLoader()
 {
 	stop();
 
-	if(m_head != nullptr)
+	if(!m_taskQueue.isEmpty())
 	{
-		ANKI_ASSERT(m_tail != nullptr);
 		ANKI_LOGW("Stoping loading thread while there is work to do");
 
-		AsyncLoaderTask* task = m_head;
-
-		do
+		while(!m_taskQueue.isEmpty())
 		{
-			AsyncLoaderTask* next = task->m_next;
+			AsyncLoaderTask* task = &m_taskQueue.getFront();
+			m_taskQueue.popFront();
 			m_alloc.deleteInstance(task);
-			task = next;
-		} while(task != nullptr);
+		}
 	}
 }
 
 //==============================================================================
-Error AsyncLoader::create(const HeapAllocator<U8>& alloc)
+void AsyncLoader::init(const HeapAllocator<U8>& alloc)
 {
 	m_alloc = alloc;
 	m_thread.start(this, threadCallback);
-	return ErrorCode::NONE;
 }
 
 //==============================================================================
@@ -50,7 +46,6 @@ void AsyncLoader::stop()
 	{
 		LockGuard<Mutex> lock(m_mtx);
 		m_quit = true;
-
 		m_condVar.notifyOne();
 	}
 
@@ -58,6 +53,27 @@ void AsyncLoader::stop()
 	(void)err;
 }
 
+//==============================================================================
+void AsyncLoader::pause()
+{
+	{
+		LockGuard<Mutex> lock(m_mtx);
+		m_paused = true;
+		m_sync = true;
+		m_condVar.notifyOne();
+	}
+
+	m_barrier.wait();
+}
+
+//==============================================================================
+void AsyncLoader::resume()
+{
+	LockGuard<Mutex> lock(m_mtx);
+	m_paused = false;
+	m_condVar.notifyOne();
+}
+
 //==============================================================================
 Error AsyncLoader::threadCallback(Thread::Info& info)
 {
@@ -72,12 +88,14 @@ Error AsyncLoader::threadWorker()
 
 	while(!err)
 	{
-		AsyncLoaderTask* task;
+		AsyncLoaderTask* task = nullptr;
+		Bool quit = false;
+		Bool sync = false;
 
 		{
 			// Wait for something
 			LockGuard<Mutex> lock(m_mtx);
-			while(m_head == nullptr && m_quit == false)
+			while((m_taskQueue.isEmpty() || m_paused) && !m_quit && !m_sync)
 			{
 				m_condVar.wait(m_mtx);
 			}
@@ -85,32 +103,57 @@ Error AsyncLoader::threadWorker()
 			// Do some work
 			if(m_quit)
 			{
-				break;
+				quit = true;
 			}
-
-			task = m_head;
-
-			// Update the queue
-			if(m_head->m_next == nullptr)
+			else if(m_sync)
 			{
-				ANKI_ASSERT(m_tail == m_head);
-				m_head = m_tail = nullptr;
+				sync = true;
+				m_sync = false;
 			}
 			else
 			{
-				m_head = m_head->m_next;
+				task = &m_taskQueue.getFront();
+				m_taskQueue.popFront();
 			}
 		}
 
-		// Exec the task
-		err = (*task)();
-		if(err)
+		if(quit)
+		{
+			break;
+		}
+		else if(sync)
 		{
-			ANKI_LOGE("Async loader task failed");
+			m_barrier.wait();
 		}
+		else
+		{
+			// Exec the task
+			ANKI_ASSERT(task);
+			AsyncLoaderTaskContext ctx;
+			err = (*task)(ctx);
+			if(err)
+			{
+				ANKI_LOGE("Async loader task failed");
+			}
+
+			// Do other stuff
+			if(ctx.m_resubmitTask)
+			{
+				LockGuard<Mutex> lock(m_mtx);
+				m_taskQueue.pushBack(task);
+			}
+			else
+			{
+				// Delete the task
+				m_alloc.deleteInstance(task);
+			}
 
-		// Delete the task
-		m_alloc.deleteInstance(task);
+			if(ctx.m_pause)
+			{
+				LockGuard<Mutex> lock(m_mtx);
+				m_paused = true;
+			}
+		}
 	}
 
 	return err;

+ 2 - 4
src/resource/ResourceManager.cpp

@@ -37,8 +37,6 @@ ResourceManager::~ResourceManager()
 //==============================================================================
 Error ResourceManager::create(ResourceManagerInitInfo& init)
 {
-	Error err = ErrorCode::NONE;
-
 	m_gr = init.m_gr;
 	m_physics = init.m_physics;
 	m_fs = init.m_resourceFs;
@@ -76,9 +74,9 @@ Error ResourceManager::create(ResourceManagerInitInfo& init)
 
 	// Init the thread
 	m_asyncLoader = m_alloc.newInstance<AsyncLoader>();
-	err = m_asyncLoader->create(m_alloc);
+	m_asyncLoader->init(m_alloc);
 
-	return err;
+	return ErrorCode::NONE;
 }
 
 } // end namespace anki

+ 2 - 2
src/resource/TextureResource.cpp

@@ -37,11 +37,11 @@ public:
 	{
 	}
 
-	Error operator()() final;
+	Error operator()(AsyncLoaderTaskContext& ctx) final;
 };
 
 //==============================================================================
-Error TexUploadTask::operator()()
+Error TexUploadTask::operator()(AsyncLoaderTaskContext& ctx)
 {
 	CommandBufferPtr cmdb =
 		m_gr->newInstance<CommandBuffer>(CommandBufferInitInfo());

+ 85 - 17
tests/resource/AsyncLoader.cpp

@@ -20,16 +20,21 @@ public:
 	Barrier* m_barrier = nullptr;
 	Atomic<U32>* m_count = nullptr;
 	I32 m_id = -1;
+	Bool m_pause;
+	Bool m_resubmit;
 
-	Task(F32 time, Barrier* barrier, Atomic<U32>* count, I32 id = -1)
+	Task(F32 time, Barrier* barrier, Atomic<U32>* count, I32 id = -1, 
+		Bool pause = false, Bool resubmit = false)
 		: m_sleepTime(time)
 		, m_barrier(barrier)
 		, m_count(count)
 		, m_id(id)
+		, m_pause(pause)
+		, m_resubmit(resubmit)
 	{
 	}
 
-	Error operator()()
+	Error operator()(AsyncLoaderTaskContext& ctx)
 	{
 		if(m_count)
 		{
@@ -43,8 +48,6 @@ public:
 					return ErrorCode::FUNCTION_FAILED;
 				}
 			}
-
-			return ErrorCode::NONE;
 		}
 
 		if(m_sleepTime != 0.0)
@@ -56,6 +59,10 @@ public:
 		{
 			m_barrier->wait();
 		}
+		
+		ctx.m_pause = m_pause;
+		ctx.m_resubmitTask = m_resubmit;
+		m_resubmit = false;
 
 		return ErrorCode::NONE;
 	}
@@ -74,7 +81,7 @@ public:
 	{
 	}
 
-	Error operator()()
+	Error operator()(AsyncLoaderTaskContext& ctx)
 	{
 		void* mem = m_alloc.allocate(10);
 		if(!mem)
@@ -101,13 +108,13 @@ ANKI_TEST(Resource, AsyncLoader)
 	// Simple create destroy
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 	}
 
 	// Simple task that will finish
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 		Barrier barrier(2);
 
 		a.newTask<Task>(0.0, &barrier, nullptr);
@@ -117,31 +124,32 @@ ANKI_TEST(Resource, AsyncLoader)
 	// Many tasks that will finish
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 		Barrier barrier(2);
 		Atomic<U32> counter = {0};
+		const U COUNT = 100;
 
-		for(U i = 0; i < 100; i++)
+		for(U i = 0; i < COUNT; i++)
 		{
 			Barrier* pbarrier = nullptr;
 
-			if(i == 99)
+			if(i == COUNT - 1)
 			{
 				pbarrier = &barrier;
 			}
 
-			a.newTask<Task>(0.0, pbarrier, &counter);
+			a.newTask<Task>(0.01, pbarrier, &counter);
 		}
 
 		barrier.wait();
 
-		ANKI_TEST_EXPECT_EQ(counter.load(), 100);
+		ANKI_TEST_EXPECT_EQ(counter.load(), COUNT);
 	}
 
 	// Many tasks that will _not_ finish
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 
 		for(U i = 0; i < 100; i++)
 		{
@@ -152,7 +160,7 @@ ANKI_TEST(Resource, AsyncLoader)
 	// Tasks that allocate
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 		Barrier barrier(2);
 
 		for(U i = 0; i < 10; i++)
@@ -173,7 +181,7 @@ ANKI_TEST(Resource, AsyncLoader)
 	// Tasks that allocate and never finished
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 
 		for(U i = 0; i < 10; i++)
 		{
@@ -181,10 +189,70 @@ ANKI_TEST(Resource, AsyncLoader)
 		}
 	}
 
-	// Random struf
+	// Pause/resume
+	{
+		AsyncLoader a;
+		a.init(alloc);
+		Atomic<U32> counter(0);
+		Barrier barrier(2);
+		
+		// Check if the pause will sync
+		a.newTask<Task>(0.5, nullptr, &counter, 0);
+		HighRezTimer::sleep(0.25); // Wait for the thread to pick the task...
+		a.pause(); /// ...and then sync
+		ANKI_TEST_EXPECT_EQ(counter.load(), 1);
+		
+		// Test resume
+		a.newTask<Task>(0.1, nullptr, &counter, 1);
+		HighRezTimer::sleep(1.0);
+		ANKI_TEST_EXPECT_EQ(counter.load(), 1);
+		a.resume();
+		
+		// Sync
+		a.newTask<Task>(0.1, &barrier, &counter, 2);
+		barrier.wait();
+		
+		ANKI_TEST_EXPECT_EQ(counter.load(), 3);
+	}	
+
+	// Pause/resume
+	{
+		AsyncLoader a;
+		a.init(alloc);
+		Atomic<U32> counter(0);
+		Barrier barrier(2);
+		
+		// Check task resubmit
+		a.newTask<Task>(0.0, &barrier, &counter, -1, false, true);
+		barrier.wait();
+		barrier.wait();
+		ANKI_TEST_EXPECT_EQ(counter.load(), 2);
+		
+		// Check task pause
+		a.newTask<Task>(0.0, nullptr, &counter, -1, true, false);
+		a.newTask<Task>(0.0, nullptr, &counter, -1, false, false);
+		HighRezTimer::sleep(1.0);
+		ANKI_TEST_EXPECT_EQ(counter.load(), 3);
+		a.resume();
+		HighRezTimer::sleep(1.0);
+		ANKI_TEST_EXPECT_EQ(counter.load(), 4);
+		
+		// Check both
+		counter.set(0);
+		a.newTask<Task>(0.0, nullptr, &counter, 0, false, false);
+		a.newTask<Task>(0.0, nullptr, &counter, -1, true, true);
+		a.newTask<Task>(0.0, nullptr, &counter, 2, false, false);
+		HighRezTimer::sleep(1.0);
+		ANKI_TEST_EXPECT_EQ(counter.load(), 2);
+		a.resume();
+		HighRezTimer::sleep(1.0);
+		ANKI_TEST_EXPECT_EQ(counter.load(), 4);
+	}
+
+	// Fuzzy test
 	{
 		AsyncLoader a;
-		ANKI_TEST_EXPECT_NO_ERR(a.create(alloc));
+		a.init(alloc);
 		Barrier barrier(2);
 		Atomic<U32> counter = {0};