BsCoreThread.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. #include "BsCoreThread.h"
  2. #include "BsThreadPool.h"
  3. #include "BsTaskScheduler.h"
  4. #include "BsFrameAlloc.h"
  5. using namespace std::placeholders;
  6. namespace BansheeEngine
  7. {
  8. BS_THREADLOCAL CoreThread::AccessorContainer* CoreThread::mAccessor = nullptr;
  9. CoreThread::CoreThread()
  10. : mCoreThreadShutdown(false)
  11. , mCommandQueue(nullptr)
  12. , mMaxCommandNotifyId(0)
  13. , mSyncedCoreAccessor(nullptr)
  14. , mActiveFrameAlloc(0)
  15. {
  16. for (UINT32 i = 0; i < NUM_FRAME_ALLOCS; i++)
  17. {
  18. mFrameAllocs[i] = bs_new<FrameAlloc>();
  19. mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  20. }
  21. mCoreThreadId = BS_THREAD_CURRENT_ID;
  22. mCommandQueue = bs_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID);
  23. initCoreThread();
  24. }
  25. CoreThread::~CoreThread()
  26. {
  27. // TODO - What if something gets queued between the queued call to destroy_internal and this!?
  28. shutdownCoreThread();
  29. {
  30. BS_LOCK_MUTEX(mAccessorMutex);
  31. for(auto& accessor : mAccessors)
  32. {
  33. bs_delete(accessor);
  34. }
  35. mAccessors.clear();
  36. }
  37. if(mCommandQueue != nullptr)
  38. {
  39. bs_delete(mCommandQueue);
  40. mCommandQueue = nullptr;
  41. }
  42. for (UINT32 i = 0; i < NUM_FRAME_ALLOCS; i++)
  43. {
  44. mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  45. bs_delete(mFrameAllocs[i]);
  46. }
  47. }
  48. void CoreThread::initCoreThread()
  49. {
  50. #if !BS_FORCE_SINGLETHREADED_RENDERING
  51. #if BS_THREAD_SUPPORT
  52. mCoreThread = ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
  53. #else
  54. BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
  55. #endif
  56. #endif
  57. }
  58. void CoreThread::runCoreThread()
  59. {
  60. #if !BS_FORCE_SINGLETHREADED_RENDERING
  61. TaskScheduler::instance().removeWorker(); // One less worker because we are reserving one core for this thread
  62. mCoreThreadId = BS_THREAD_CURRENT_ID;
  63. mSyncedCoreAccessor = bs_new<CoreThreadAccessor<CommandQueueSync>>(BS_THREAD_CURRENT_ID);
  64. while(true)
  65. {
  66. // Wait until we get some ready commands
  67. Queue<QueuedCommand>* commands = nullptr;
  68. {
  69. BS_LOCK_MUTEX_NAMED(mCommandQueueMutex, lock)
  70. while(mCommandQueue->isEmpty())
  71. {
  72. if(mCoreThreadShutdown)
  73. {
  74. bs_delete(mSyncedCoreAccessor);
  75. TaskScheduler::instance().addWorker();
  76. return;
  77. }
  78. TaskScheduler::instance().addWorker(); // Do something else while we wait, otherwise this core will be unused
  79. BS_THREAD_WAIT(mCommandReadyCondition, mCommandQueueMutex, lock);
  80. TaskScheduler::instance().removeWorker();
  81. }
  82. commands = mCommandQueue->flush();
  83. }
  84. // Play commands
  85. mCommandQueue->playbackWithNotify(commands, std::bind(&CoreThread::commandCompletedNotify, this, _1));
  86. }
  87. #endif
  88. }
  89. void CoreThread::shutdownCoreThread()
  90. {
  91. #if !BS_FORCE_SINGLETHREADED_RENDERING
  92. {
  93. BS_LOCK_MUTEX(mCommandQueueMutex);
  94. mCoreThreadShutdown = true;
  95. }
  96. // Wake all threads. They will quit after they see the shutdown flag
  97. BS_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  98. mCoreThreadId = BS_THREAD_CURRENT_ID;
  99. mCoreThread.blockUntilComplete();
  100. #endif
  101. }
  102. CoreAccessorPtr CoreThread::getAccessor()
  103. {
  104. if(mAccessor == nullptr)
  105. {
  106. CoreAccessorPtr newAccessor = bs_shared_ptr<CoreThreadAccessor<CommandQueueNoSync>>(BS_THREAD_CURRENT_ID);
  107. mAccessor = bs_new<AccessorContainer>();
  108. mAccessor->accessor = newAccessor;
  109. BS_LOCK_MUTEX(mAccessorMutex);
  110. mAccessors.push_back(mAccessor);
  111. }
  112. return mAccessor->accessor;
  113. }
  114. SyncedCoreAccessor& CoreThread::getSyncedAccessor()
  115. {
  116. return *mSyncedCoreAccessor;
  117. }
  118. void CoreThread::submitAccessors(bool blockUntilComplete)
  119. {
  120. Vector<AccessorContainer*> accessorCopies;
  121. {
  122. BS_LOCK_MUTEX(mAccessorMutex);
  123. accessorCopies = mAccessors;
  124. }
  125. for(auto& accessor : accessorCopies)
  126. accessor->accessor->submitToCoreThread(blockUntilComplete);
  127. mSyncedCoreAccessor->submitToCoreThread(blockUntilComplete);
  128. }
  129. AsyncOp CoreThread::queueReturnCommand(std::function<void(AsyncOp&)> commandCallback, bool blockUntilComplete)
  130. {
  131. assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
  132. AsyncOp op;
  133. UINT32 commandId = -1;
  134. {
  135. BS_LOCK_MUTEX(mCommandQueueMutex);
  136. if(blockUntilComplete)
  137. {
  138. commandId = mMaxCommandNotifyId++;
  139. op = mCommandQueue->queueReturn(commandCallback, true, commandId);
  140. }
  141. else
  142. op = mCommandQueue->queueReturn(commandCallback);
  143. }
  144. BS_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  145. if(blockUntilComplete)
  146. blockUntilCommandCompleted(commandId);
  147. return op;
  148. }
  149. void CoreThread::queueCommand(std::function<void()> commandCallback, bool blockUntilComplete)
  150. {
  151. assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
  152. UINT32 commandId = -1;
  153. {
  154. BS_LOCK_MUTEX(mCommandQueueMutex);
  155. if(blockUntilComplete)
  156. {
  157. commandId = mMaxCommandNotifyId++;
  158. mCommandQueue->queue(commandCallback, true, commandId);
  159. }
  160. else
  161. mCommandQueue->queue(commandCallback);
  162. }
  163. BS_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  164. if(blockUntilComplete)
  165. blockUntilCommandCompleted(commandId);
  166. }
  167. void CoreThread::update()
  168. {
  169. for (UINT32 i = 0; i < NUM_FRAME_ALLOCS; i++)
  170. mFrameAllocs[i]->setOwnerThread(mCoreThreadId);
  171. mActiveFrameAlloc = (mActiveFrameAlloc + 1) % 2;
  172. mFrameAllocs[mActiveFrameAlloc]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  173. mFrameAllocs[mActiveFrameAlloc]->clear();
  174. }
  175. FrameAlloc* CoreThread::getFrameAlloc() const
  176. {
  177. return mFrameAllocs[mActiveFrameAlloc];
  178. }
  179. void CoreThread::blockUntilCommandCompleted(UINT32 commandId)
  180. {
  181. #if !BS_FORCE_SINGLETHREADED_RENDERING
  182. BS_LOCK_MUTEX_NAMED(mCommandNotifyMutex, lock);
  183. while(true)
  184. {
  185. // TODO - This might be causing a deadlock in Release mode. I'm thinking because mCommandsCompleted isn't marked as volatile.
  186. // Check if our command id is in the completed list
  187. auto iter = mCommandsCompleted.begin();
  188. for(; iter != mCommandsCompleted.end(); ++iter)
  189. {
  190. if(*iter == commandId)
  191. break;
  192. }
  193. if(iter != mCommandsCompleted.end())
  194. {
  195. mCommandsCompleted.erase(iter);
  196. break;
  197. }
  198. BS_THREAD_WAIT(mCommandCompleteCondition, mCommandNotifyMutex, lock);
  199. }
  200. #endif
  201. }
  202. void CoreThread::commandCompletedNotify(UINT32 commandId)
  203. {
  204. {
  205. BS_LOCK_MUTEX(mCommandNotifyMutex);
  206. mCommandsCompleted.push_back(commandId);
  207. }
  208. BS_THREAD_NOTIFY_ALL(mCommandCompleteCondition);
  209. }
  210. CoreThread& gCoreThread()
  211. {
  212. return CoreThread::instance();
  213. }
  214. CoreThreadAccessor<CommandQueueNoSync>& gCoreAccessor()
  215. {
  216. return *CoreThread::instance().getAccessor();
  217. }
  218. void throwIfNotCoreThread()
  219. {
  220. #if !BS_FORCE_SINGLETHREADED_RENDERING
  221. if(BS_THREAD_CURRENT_ID != CoreThread::instance().getCoreThreadId())
  222. BS_EXCEPT(InternalErrorException, "This method can only be accessed from the core thread.");
  223. #endif
  224. }
  225. void throwIfCoreThread()
  226. {
  227. #if !BS_FORCE_SINGLETHREADED_RENDERING
  228. if(BS_THREAD_CURRENT_ID == CoreThread::instance().getCoreThreadId())
  229. BS_EXCEPT(InternalErrorException, "This method cannot be accessed from the core thread.");
  230. #endif
  231. }
  232. }