ThreadPool.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #include "ThreadPool.h"
  2. #include "BeefPerf.h"
  3. USING_NS_BF;
  4. static BF_TLS_DECLSPEC ThreadPool* gPoolParent;
  5. ThreadPool::Thread::Thread()
  6. {
  7. mCurJobThreadId = -1;
  8. mActiveJob = NULL;
  9. }
  10. ThreadPool::Thread::~Thread()
  11. {
  12. BfpThread_Release(mBfpThread);
  13. }
  14. void ThreadPool::Thread::Proc()
  15. {
  16. bool isWorking = true;
  17. BpSetThreadName("ThreadPoolProc");
  18. BfpThread_SetName(NULL, "ThreadPoolProc", NULL);
  19. gPoolParent = mThreadPool;
  20. while (true)
  21. {
  22. Job* job = NULL;
  23. //
  24. {
  25. AutoCrit autoCrit(mThreadPool->mCritSect);
  26. if (!mThreadPool->mJobs.IsEmpty())
  27. {
  28. job = mThreadPool->mJobs[0];
  29. job->mProcessing = true;
  30. mThreadPool->mJobs.RemoveAt(0);
  31. }
  32. mActiveJob = job;
  33. if (job == NULL)
  34. mCurJobThreadId = -1;
  35. else
  36. mCurJobThreadId = job->mFromThreadId;
  37. bool hasWork = job != NULL;
  38. if (hasWork != isWorking)
  39. {
  40. isWorking = hasWork;
  41. if (isWorking)
  42. mThreadPool->mFreeThreads--;
  43. else
  44. mThreadPool->mFreeThreads++;
  45. }
  46. }
  47. if ((job == NULL) && (mThreadPool->mShuttingDown))
  48. {
  49. break;
  50. }
  51. bool didCancel = false;
  52. if ((mThreadPool->mShuttingDown) && (job->Cancel()))
  53. didCancel = true;
  54. if (job == NULL)
  55. {
  56. mThreadPool->mEvent.WaitFor();
  57. continue;
  58. }
  59. if (!didCancel)
  60. {
  61. BP_ZONE("ThreadProc:Job");
  62. job->Perform();
  63. }
  64. // Run dtor synchronized
  65. AutoCrit autoCrit(mThreadPool->mCritSect);
  66. mActiveJob = NULL;
  67. delete job;
  68. }
  69. }
  70. ThreadPool::ThreadPool(int maxThreads, int stackSize)
  71. {
  72. mMaxThreads = maxThreads;
  73. mShuttingDown = false;
  74. mStackSize = stackSize;
  75. mFreeThreads = 0;
  76. mRunningThreads = 0;
  77. }
  78. ThreadPool::~ThreadPool()
  79. {
  80. Shutdown();
  81. }
  82. void ThreadPool::Shutdown()
  83. {
  84. mShuttingDown = true;
  85. mEvent.Set(true);
  86. while (true)
  87. {
  88. Thread* thread = NULL;
  89. //
  90. {
  91. AutoCrit autoCrit(mCritSect);
  92. if (mThreads.IsEmpty())
  93. break;
  94. thread = mThreads.back();
  95. mThreads.pop_back();
  96. }
  97. if (!BfpThread_WaitFor(thread->mBfpThread, -1))
  98. break;
  99. AutoCrit autoCrit(mCritSect);
  100. delete thread;
  101. mRunningThreads--;
  102. }
  103. BF_ASSERT(mRunningThreads == 0);
  104. for (auto job : mJobs)
  105. delete job;
  106. mJobs.Clear();
  107. }
  108. void ThreadPool::SetStackSize(int stackSize)
  109. {
  110. mStackSize = stackSize;
  111. }
  112. static void BFP_CALLTYPE WorkerProc(void* param)
  113. {
  114. ((ThreadPool::Thread*)param)->Proc();
  115. }
  116. void ThreadPool::AddJob(Job* job, int maxWorkersPerProviderThread)
  117. {
  118. AutoCrit autoCrit(mCritSect);
  119. BfpThreadId curThreadId = BfpThread_GetCurrentId();
  120. job->mFromThreadId = curThreadId;
  121. mJobs.Add(job);
  122. mEvent.Set();
  123. if (((int)mThreads.size() < mMaxThreads) && (mFreeThreads == 0))
  124. {
  125. int workersForUs = 0;
  126. for (auto thread : mThreads)
  127. {
  128. if (thread->mCurJobThreadId == curThreadId)
  129. workersForUs++;
  130. }
  131. if (workersForUs >= maxWorkersPerProviderThread)
  132. return;
  133. mRunningThreads++;
  134. Thread* thread = new Thread();
  135. thread->mThreadPool = this;
  136. thread->mBfpThread = BfpThread_Create(WorkerProc, (void*)thread, mStackSize, BfpThreadCreateFlag_StackSizeReserve);
  137. mThreads.Add(thread);
  138. }
  139. }
  140. void ThreadPool::AddJob(BfpThreadStartProc proc, void* param, int maxWorkersPerProviderThread)
  141. {
  142. ProcJob* job = new ProcJob();
  143. job->mProc = proc;
  144. job->mParam = param;
  145. AddJob(job, maxWorkersPerProviderThread);
  146. }
  147. bool ThreadPool::IsInJob()
  148. {
  149. return gPoolParent == this;
  150. }
  151. void ThreadPool::CancelAll()
  152. {
  153. AutoCrit autoCrit(mCritSect);
  154. for (auto job : mJobs)
  155. job->Cancel();
  156. for (auto thread : mThreads)
  157. if (thread->mActiveJob != NULL)
  158. thread->mActiveJob->Cancel();
  159. }