AsyncLoader.cpp 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright (C) 2009-present, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <AnKi/Resource/AsyncLoader.h>
  6. #include <AnKi/Core/StatsSet.h>
  7. #include <AnKi/Util/Logger.h>
  8. #include <AnKi/Util/Tracer.h>
  9. #include <AnKi/Util/HighRezTimer.h>
  10. namespace anki {
  11. ANKI_SVAR(AsyncTasksInFlight, StatCategory::kMisc, "Async loader tasks", StatFlag::kNone)
  12. AsyncLoader::AsyncLoader()
  13. : m_thread("AsyncLoad")
  14. {
  15. m_thread.start(this, threadCallback);
  16. }
  17. AsyncLoader::~AsyncLoader()
  18. {
  19. stop();
  20. for(auto& queue : m_taskQueues)
  21. {
  22. if(!queue.isEmpty())
  23. {
  24. ANKI_RESOURCE_LOGW("Stoping loading thread while there is work to do");
  25. while(!queue.isEmpty())
  26. {
  27. AsyncLoaderTask* task = queue.popFront();
  28. deleteInstance(ResourceMemoryPool::getSingleton(), task);
  29. }
  30. }
  31. }
  32. }
  33. void AsyncLoader::stop()
  34. {
  35. {
  36. LockGuard<Mutex> lock(m_mtx);
  37. m_quit = true;
  38. m_condVar.notifyOne();
  39. }
  40. [[maybe_unused]] Error err = m_thread.join();
  41. }
  42. Error AsyncLoader::threadCallback(ThreadCallbackInfo& info)
  43. {
  44. AsyncLoader& self = *reinterpret_cast<AsyncLoader*>(info.m_userData);
  45. return self.threadWorker();
  46. }
  47. Error AsyncLoader::threadWorker()
  48. {
  49. Error err = Error::kNone;
  50. while(!err)
  51. {
  52. AsyncLoaderTask* task = nullptr;
  53. AsyncLoaderPriority taskPriority = AsyncLoaderPriority::kCount;
  54. Bool quit = false;
  55. // Block until there is work to do
  56. {
  57. LockGuard<Mutex> lock(m_mtx);
  58. while(m_taskQueues[AsyncLoaderPriority::kHigh].isEmpty() && m_taskQueues[AsyncLoaderPriority::kMedium].isEmpty()
  59. && m_taskQueues[AsyncLoaderPriority::kLow].isEmpty() && !m_quit)
  60. {
  61. m_condVar.wait(m_mtx);
  62. }
  63. if(m_quit)
  64. {
  65. quit = true;
  66. }
  67. else
  68. {
  69. for(AsyncLoaderPriority priority : EnumIterable<AsyncLoaderPriority>())
  70. {
  71. if(!m_taskQueues[priority].isEmpty())
  72. {
  73. task = m_taskQueues[priority].popFront();
  74. taskPriority = priority;
  75. break;
  76. }
  77. }
  78. }
  79. }
  80. if(quit)
  81. {
  82. break;
  83. }
  84. else
  85. {
  86. // Exec the task
  87. ANKI_ASSERT(task);
  88. AsyncLoaderTaskContext ctx;
  89. ctx.m_priority = taskPriority;
  90. {
  91. // HighRezTimer::sleep(250.0_ms);
  92. ANKI_TRACE_SCOPED_EVENT(RsrcAsyncTask);
  93. err = (*task)(ctx);
  94. g_svarAsyncTasksInFlight.decrement(1u);
  95. }
  96. if(!err)
  97. {
  98. m_tasksInFlightCount.fetchSub(1);
  99. }
  100. else
  101. {
  102. ANKI_RESOURCE_LOGE("Async loader task failed");
  103. }
  104. // Do other stuff
  105. if(ctx.m_resubmitTask)
  106. {
  107. LockGuard<Mutex> lock(m_mtx);
  108. m_taskQueues[ctx.m_priority].pushBack(task);
  109. }
  110. else
  111. {
  112. // Delete the task
  113. deleteInstance(ResourceMemoryPool::getSingleton(), task);
  114. }
  115. }
  116. }
  117. return err;
  118. }
  119. void AsyncLoader::submitTask(AsyncLoaderTask* task, AsyncLoaderPriority priority)
  120. {
  121. ANKI_ASSERT(task);
  122. m_tasksInFlightCount.fetchAdd(1);
  123. g_svarAsyncTasksInFlight.increment(1);
  124. LockGuard<Mutex> lock(m_mtx);
  125. m_taskQueues[priority].pushBack(task);
  126. m_condVar.notifyOne();
  127. }
  128. } // end namespace anki