BsCoreThread.cpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. //********************************** Banshee Engine (www.banshee3d.com) **************************************************//
  2. //**************** Copyright (c) 2016 Marko Pintera ([email protected]). All rights reserved. **********************//
  3. #include "CoreThread/BsCoreThread.h"
  4. #include "Threading/BsThreadPool.h"
  5. #include "Threading/BsTaskScheduler.h"
  6. #include "BsCoreApplication.h"
  7. using namespace std::placeholders;
  8. namespace bs
  9. {
  10. CoreThread::QueueData CoreThread::mPerThreadQueue;
  11. BS_THREADLOCAL CoreThread::ThreadQueueContainer* CoreThread::QueueData::current = nullptr;
  12. CoreThread::CoreThread()
  13. : mActiveFrameAlloc(0)
  14. , mCoreThreadShutdown(false)
  15. , mCoreThreadStarted(false)
  16. , mCommandQueue(nullptr)
  17. , mMaxCommandNotifyId(0)
  18. {
  19. for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
  20. {
  21. mFrameAllocs[i] = bs_new<FrameAlloc>();
  22. mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  23. }
  24. mSimThreadId = BS_THREAD_CURRENT_ID;
  25. mCoreThreadId = mSimThreadId; // For now
  26. mCommandQueue = bs_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID);
  27. initCoreThread();
  28. }
  29. CoreThread::~CoreThread()
  30. {
  31. // TODO - What if something gets queued between the queued call to destroy_internal and this!?
  32. shutdownCoreThread();
  33. {
  34. Lock lock(mCoreQueueMutex);
  35. for(auto& queue : mAllQueues)
  36. bs_delete(queue);
  37. mAllQueues.clear();
  38. }
  39. if(mCommandQueue != nullptr)
  40. {
  41. bs_delete(mCommandQueue);
  42. mCommandQueue = nullptr;
  43. }
  44. for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
  45. {
  46. mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  47. bs_delete(mFrameAllocs[i]);
  48. }
  49. }
  50. void CoreThread::initCoreThread()
  51. {
  52. #if !BS_FORCE_SINGLETHREADED_RENDERING
  53. #if BS_THREAD_SUPPORT
  54. mCoreThread = ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
  55. // Need to wait to unsure thread ID is correctly set before continuing
  56. Lock lock(mThreadStartedMutex);
  57. while (!mCoreThreadStarted)
  58. mCoreThreadStartedCondition.wait(lock);
  59. #else
  60. BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
  61. #endif
  62. #endif
  63. }
  64. void CoreThread::runCoreThread()
  65. {
  66. #if !BS_FORCE_SINGLETHREADED_RENDERING
  67. TaskScheduler::instance().removeWorker(); // One less worker because we are reserving one core for this thread
  68. {
  69. Lock lock(mThreadStartedMutex);
  70. mCoreThreadStarted = true;
  71. mCoreThreadId = BS_THREAD_CURRENT_ID;
  72. }
  73. mCoreThreadStartedCondition.notify_one();
  74. while(true)
  75. {
  76. // Wait until we get some ready commands
  77. Queue<QueuedCommand>* commands = nullptr;
  78. {
  79. Lock lock(mCommandQueueMutex);
  80. while(mCommandQueue->isEmpty())
  81. {
  82. if(mCoreThreadShutdown)
  83. {
  84. TaskScheduler::instance().addWorker();
  85. return;
  86. }
  87. TaskScheduler::instance().addWorker(); // Do something else while we wait, otherwise this core will be unused
  88. mCommandReadyCondition.wait(lock);
  89. TaskScheduler::instance().removeWorker();
  90. }
  91. commands = mCommandQueue->flush();
  92. }
  93. // Play commands
  94. mCommandQueue->playbackWithNotify(commands, std::bind(&CoreThread::commandCompletedNotify, this, _1));
  95. }
  96. #endif
  97. }
  98. void CoreThread::shutdownCoreThread()
  99. {
  100. #if !BS_FORCE_SINGLETHREADED_RENDERING
  101. {
  102. Lock lock(mCommandQueueMutex);
  103. mCoreThreadShutdown = true;
  104. }
  105. // Wake all threads. They will quit after they see the shutdown flag
  106. mCommandReadyCondition.notify_all();
  107. mCoreThreadId = BS_THREAD_CURRENT_ID;
  108. mCoreThread.blockUntilComplete();
  109. #endif
  110. }
  111. SPtr<TCoreThreadQueue<CommandQueueNoSync>> CoreThread::getQueue()
  112. {
  113. if(mPerThreadQueue.current == nullptr)
  114. {
  115. SPtr<TCoreThreadQueue<CommandQueueNoSync>> newQueue = bs_shared_ptr_new<TCoreThreadQueue<CommandQueueNoSync>>(BS_THREAD_CURRENT_ID);
  116. mPerThreadQueue.current = bs_new<ThreadQueueContainer>();
  117. mPerThreadQueue.current->queue = newQueue;
  118. mPerThreadQueue.current->isMain = BS_THREAD_CURRENT_ID == mSimThreadId;
  119. Lock lock(mCoreQueueMutex);
  120. mAllQueues.push_back(mPerThreadQueue.current);
  121. }
  122. return mPerThreadQueue.current->queue;
  123. }
  124. void CoreThread::submitAll(bool blockUntilComplete)
  125. {
  126. Vector<ThreadQueueContainer*> queueCopies;
  127. {
  128. Lock lock(mCoreQueueMutex);
  129. queueCopies = mAllQueues;
  130. }
  131. // Submit workers first
  132. ThreadQueueContainer* mainQueue = nullptr;
  133. for (auto& queue : queueCopies)
  134. {
  135. if (!queue->isMain)
  136. queue->queue->submitToCoreThread(blockUntilComplete);
  137. else
  138. mainQueue = queue;
  139. }
  140. // Then main
  141. if (mainQueue != nullptr)
  142. mainQueue->queue->submitToCoreThread(blockUntilComplete);
  143. }
  144. void CoreThread::submit(bool blockUntilComplete)
  145. {
  146. getQueue()->submitToCoreThread(blockUntilComplete);
  147. }
  148. AsyncOp CoreThread::queueReturnCommand(std::function<void(AsyncOp&)> commandCallback, CoreThreadQueueFlags flags)
  149. {
  150. assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
  151. if (!flags.isSet(CTQF_InternalQueue))
  152. return getQueue()->queueReturnCommand(commandCallback);
  153. else
  154. {
  155. bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete);
  156. AsyncOp op;
  157. UINT32 commandId = -1;
  158. {
  159. Lock lock(mCommandQueueMutex);
  160. if (blockUntilComplete)
  161. {
  162. commandId = mMaxCommandNotifyId++;
  163. op = mCommandQueue->queueReturn(commandCallback, true, commandId);
  164. }
  165. else
  166. op = mCommandQueue->queueReturn(commandCallback);
  167. }
  168. mCommandReadyCondition.notify_all();
  169. if (blockUntilComplete)
  170. blockUntilCommandCompleted(commandId);
  171. return op;
  172. }
  173. }
  174. void CoreThread::queueCommand(std::function<void()> commandCallback, CoreThreadQueueFlags flags)
  175. {
  176. assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
  177. if (!flags.isSet(CTQF_InternalQueue))
  178. getQueue()->queueCommand(commandCallback);
  179. else
  180. {
  181. bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete);
  182. UINT32 commandId = -1;
  183. {
  184. Lock lock(mCommandQueueMutex);
  185. if (blockUntilComplete)
  186. {
  187. commandId = mMaxCommandNotifyId++;
  188. mCommandQueue->queue(commandCallback, true, commandId);
  189. }
  190. else
  191. mCommandQueue->queue(commandCallback);
  192. }
  193. mCommandReadyCondition.notify_all();
  194. if (blockUntilComplete)
  195. blockUntilCommandCompleted(commandId);
  196. }
  197. }
  198. void CoreThread::update()
  199. {
  200. for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
  201. mFrameAllocs[i]->setOwnerThread(mCoreThreadId);
  202. mActiveFrameAlloc = (mActiveFrameAlloc + 1) % 2;
  203. mFrameAllocs[mActiveFrameAlloc]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
  204. mFrameAllocs[mActiveFrameAlloc]->clear();
  205. }
  206. FrameAlloc* CoreThread::getFrameAlloc() const
  207. {
  208. return mFrameAllocs[mActiveFrameAlloc];
  209. }
  210. void CoreThread::blockUntilCommandCompleted(UINT32 commandId)
  211. {
  212. #if !BS_FORCE_SINGLETHREADED_RENDERING
  213. Lock lock(mCommandNotifyMutex);
  214. while(true)
  215. {
  216. // Check if our command id is in the completed list
  217. auto iter = mCommandsCompleted.begin();
  218. for(; iter != mCommandsCompleted.end(); ++iter)
  219. {
  220. if(*iter == commandId)
  221. break;
  222. }
  223. if(iter != mCommandsCompleted.end())
  224. {
  225. mCommandsCompleted.erase(iter);
  226. break;
  227. }
  228. mCommandCompleteCondition.wait(lock);
  229. }
  230. #endif
  231. }
  232. void CoreThread::commandCompletedNotify(UINT32 commandId)
  233. {
  234. {
  235. Lock lock(mCommandNotifyMutex);
  236. mCommandsCompleted.push_back(commandId);
  237. }
  238. mCommandCompleteCondition.notify_all();
  239. }
  240. CoreThread& gCoreThread()
  241. {
  242. return CoreThread::instance();
  243. }
  244. void throwIfNotCoreThread()
  245. {
  246. #if !BS_FORCE_SINGLETHREADED_RENDERING
  247. if(BS_THREAD_CURRENT_ID != CoreThread::instance().getCoreThreadId())
  248. BS_EXCEPT(InternalErrorException, "This method can only be accessed from the core thread.");
  249. #endif
  250. }
  251. void throwIfCoreThread()
  252. {
  253. #if !BS_FORCE_SINGLETHREADED_RENDERING
  254. if(BS_THREAD_CURRENT_ID == CoreThread::instance().getCoreThreadId())
  255. BS_EXCEPT(InternalErrorException, "This method cannot be accessed from the core thread.");
  256. #endif
  257. }
  258. }