CmCoreThread.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. #include "CmCoreThread.h"
  2. #include "BsThreadPool.h"
  3. #include "BsTaskScheduler.h"
  4. using namespace std::placeholders;
  5. namespace BansheeEngine
  6. {
  7. CM_THREADLOCAL CoreThread::AccessorContainer* CoreThread::mAccessor = nullptr;
  8. CoreThread::CoreThread()
  9. : mCoreThreadShutdown(false)
  10. , mCommandQueue(nullptr)
  11. , mMaxCommandNotifyId(0)
  12. , mSyncedCoreAccessor(nullptr)
  13. , mActiveFrameAlloc(0)
  14. {
  15. mFrameAllocs[0] = cm_new<FrameAlloc>();
  16. mFrameAllocs[1] = cm_new<FrameAlloc>();
  17. mCoreThreadId = CM_THREAD_CURRENT_ID;
  18. mCommandQueue = cm_new<CommandQueue<CommandQueueSync>>(CM_THREAD_CURRENT_ID);
  19. initCoreThread();
  20. }
  21. CoreThread::~CoreThread()
  22. {
  23. // TODO - What if something gets queued between the queued call to destroy_internal and this!?
  24. shutdownCoreThread();
  25. {
  26. CM_LOCK_MUTEX(mAccessorMutex);
  27. for(auto& accessor : mAccessors)
  28. {
  29. cm_delete(accessor);
  30. }
  31. mAccessors.clear();
  32. }
  33. if(mCommandQueue != nullptr)
  34. {
  35. cm_delete(mCommandQueue);
  36. mCommandQueue = nullptr;
  37. }
  38. cm_delete(mFrameAllocs[0]);
  39. cm_delete(mFrameAllocs[1]);
  40. }
  41. void CoreThread::initCoreThread()
  42. {
  43. #if !CM_FORCE_SINGLETHREADED_RENDERING
  44. #if CM_THREAD_SUPPORT
  45. ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
  46. #else
  47. CM_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
  48. #endif
  49. #endif
  50. }
  51. void CoreThread::runCoreThread()
  52. {
  53. #if !CM_FORCE_SINGLETHREADED_RENDERING
  54. TaskScheduler::instance().removeWorker(); // One less worker because we are reserving one core for this thread
  55. mCoreThreadId = CM_THREAD_CURRENT_ID;
  56. mSyncedCoreAccessor = cm_new<CoreThreadAccessor<CommandQueueSync>>(CM_THREAD_CURRENT_ID);
  57. while(true)
  58. {
  59. // Wait until we get some ready commands
  60. Queue<QueuedCommand>::type* commands = nullptr;
  61. {
  62. CM_LOCK_MUTEX_NAMED(mCommandQueueMutex, lock)
  63. while(mCommandQueue->isEmpty())
  64. {
  65. if(mCoreThreadShutdown)
  66. {
  67. cm_delete(mSyncedCoreAccessor);
  68. TaskScheduler::instance().addWorker();
  69. return;
  70. }
  71. TaskScheduler::instance().addWorker(); // Do something else while we wait, otherwise this core will be unused
  72. CM_THREAD_WAIT(mCommandReadyCondition, mCommandQueueMutex, lock);
  73. TaskScheduler::instance().removeWorker();
  74. }
  75. commands = mCommandQueue->flush();
  76. }
  77. // Play commands
  78. mCommandQueue->playbackWithNotify(commands, std::bind(&CoreThread::commandCompletedNotify, this, _1));
  79. }
  80. #endif
  81. }
  82. void CoreThread::shutdownCoreThread()
  83. {
  84. #if !CM_FORCE_SINGLETHREADED_RENDERING
  85. {
  86. CM_LOCK_MUTEX(mCommandQueueMutex);
  87. mCoreThreadShutdown = true;
  88. }
  89. // Wake all threads. They will quit after they see the shutdown flag
  90. CM_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  91. mCoreThreadId = CM_THREAD_CURRENT_ID;
  92. #endif
  93. }
  94. CoreAccessorPtr CoreThread::getAccessor()
  95. {
  96. if(mAccessor == nullptr)
  97. {
  98. CoreAccessorPtr newAccessor = cm_shared_ptr<CoreThreadAccessor<CommandQueueNoSync>>(CM_THREAD_CURRENT_ID);
  99. mAccessor = cm_new<AccessorContainer>();
  100. mAccessor->accessor = newAccessor;
  101. CM_LOCK_MUTEX(mAccessorMutex);
  102. mAccessors.push_back(mAccessor);
  103. }
  104. return mAccessor->accessor;
  105. }
  106. SyncedCoreAccessor& CoreThread::getSyncedAccessor()
  107. {
  108. return *mSyncedCoreAccessor;
  109. }
  110. void CoreThread::submitAccessors(bool blockUntilComplete)
  111. {
  112. Vector<AccessorContainer*>::type accessorCopies;
  113. {
  114. CM_LOCK_MUTEX(mAccessorMutex);
  115. accessorCopies = mAccessors;
  116. }
  117. for(auto& accessor : accessorCopies)
  118. accessor->accessor->submitToCoreThread(blockUntilComplete);
  119. mSyncedCoreAccessor->submitToCoreThread(blockUntilComplete);
  120. }
  121. AsyncOp CoreThread::queueReturnCommand(std::function<void(AsyncOp&)> commandCallback, bool blockUntilComplete)
  122. {
  123. AsyncOp op;
  124. if(CM_THREAD_CURRENT_ID == getCoreThreadId())
  125. {
  126. commandCallback(op); // Execute immediately
  127. return op;
  128. }
  129. UINT32 commandId = -1;
  130. {
  131. CM_LOCK_MUTEX(mCommandQueueMutex);
  132. if(blockUntilComplete)
  133. {
  134. commandId = mMaxCommandNotifyId++;
  135. op = mCommandQueue->queueReturn(commandCallback, true, commandId);
  136. }
  137. else
  138. op = mCommandQueue->queueReturn(commandCallback);
  139. }
  140. CM_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  141. if(blockUntilComplete)
  142. blockUntilCommandCompleted(commandId);
  143. return op;
  144. }
  145. void CoreThread::queueCommand(std::function<void()> commandCallback, bool blockUntilComplete)
  146. {
  147. if(CM_THREAD_CURRENT_ID == getCoreThreadId())
  148. {
  149. commandCallback(); // Execute immediately
  150. return;
  151. }
  152. UINT32 commandId = -1;
  153. {
  154. CM_LOCK_MUTEX(mCommandQueueMutex);
  155. if(blockUntilComplete)
  156. {
  157. commandId = mMaxCommandNotifyId++;
  158. mCommandQueue->queue(commandCallback, true, commandId);
  159. }
  160. else
  161. mCommandQueue->queue(commandCallback);
  162. }
  163. CM_THREAD_NOTIFY_ALL(mCommandReadyCondition);
  164. if(blockUntilComplete)
  165. blockUntilCommandCompleted(commandId);
  166. }
  167. void CoreThread::update()
  168. {
  169. mActiveFrameAlloc = (mActiveFrameAlloc + 1) % 2;
  170. mFrameAllocs[mActiveFrameAlloc]->clear();
  171. }
  172. FrameAlloc* CoreThread::getFrameAlloc() const
  173. {
  174. return mFrameAllocs[mActiveFrameAlloc];
  175. }
  176. void CoreThread::blockUntilCommandCompleted(UINT32 commandId)
  177. {
  178. #if !CM_FORCE_SINGLETHREADED_RENDERING
  179. CM_LOCK_MUTEX_NAMED(mCommandNotifyMutex, lock);
  180. while(true)
  181. {
  182. // TODO - This might be causing a deadlock in Release mode. I'm thinking because mCommandsCompleted isn't marked as volatile.
  183. // Check if our command id is in the completed list
  184. auto iter = mCommandsCompleted.begin();
  185. for(; iter != mCommandsCompleted.end(); ++iter)
  186. {
  187. if(*iter == commandId)
  188. break;
  189. }
  190. if(iter != mCommandsCompleted.end())
  191. {
  192. mCommandsCompleted.erase(iter);
  193. break;
  194. }
  195. CM_THREAD_WAIT(mCommandCompleteCondition, mCommandNotifyMutex, lock);
  196. }
  197. #endif
  198. }
  199. void CoreThread::commandCompletedNotify(UINT32 commandId)
  200. {
  201. {
  202. CM_LOCK_MUTEX(mCommandNotifyMutex);
  203. mCommandsCompleted.push_back(commandId);
  204. }
  205. CM_THREAD_NOTIFY_ALL(mCommandCompleteCondition);
  206. }
  207. CoreThread& gCoreThread()
  208. {
  209. return CoreThread::instance();
  210. }
  211. CoreThreadAccessor<CommandQueueNoSync>& gCoreAccessor()
  212. {
  213. return *CoreThread::instance().getAccessor();
  214. }
  215. void throwIfNotCoreThread()
  216. {
  217. #if !CM_FORCE_SINGLETHREADED_RENDERING
  218. if(CM_THREAD_CURRENT_ID != CoreThread::instance().getCoreThreadId())
  219. CM_EXCEPT(InternalErrorException, "This method can only be accessed from the core thread.");
  220. #endif
  221. }
  222. void throwIfCoreThread()
  223. {
  224. #if !CM_FORCE_SINGLETHREADED_RENDERING
  225. if(CM_THREAD_CURRENT_ID == CoreThread::instance().getCoreThreadId())
  226. CM_EXCEPT(InternalErrorException, "This method cannot be accessed from the core thread.");
  227. #endif
  228. }
  229. }