lzham_win32_threading.cpp 4.7 KB


  1. // File: lzham_task_pool_win32.cpp
  2. // See Copyright Notice and license at the end of include/lzham.h
  3. #include "lzham_core.h"
  4. #include "lzham_win32_threading.h"
  5. #include "lzham_timer.h"
  6. #if LZHAM_USE_WIN32_API
  7. // ESENTHEL CHANGED
  8. #include <process.h>
  9. namespace lzham
  10. {
  11. task_pool::task_pool() :
  12. m_num_threads(0),
  13. m_tasks_available(0, 32767),
  14. m_num_outstanding_tasks(0),
  15. m_exit_flag(false)
  16. {
  17. utils::zero_object(m_threads);
  18. }
  19. task_pool::task_pool(uint num_threads) :
  20. m_num_threads(0),
  21. m_tasks_available(0, 32767),
  22. m_num_outstanding_tasks(0),
  23. m_exit_flag(false)
  24. {
  25. utils::zero_object(m_threads);
  26. bool status = init(num_threads);
  27. LZHAM_VERIFY(status);
  28. }
  29. task_pool::~task_pool()
  30. {
  31. deinit();
  32. }
  33. bool task_pool::init(uint num_threads)
  34. {
  35. LZHAM_ASSERT(num_threads <= cMaxThreads);
  36. num_threads = math::minimum<uint>(num_threads, cMaxThreads);
  37. deinit();
  38. bool succeeded = true;
  39. m_num_threads = 0;
  40. while (m_num_threads < num_threads)
  41. {
  42. m_threads[m_num_threads] = (HANDLE)_beginthreadex(NULL, 32768, thread_func, this, 0, NULL);
  43. LZHAM_ASSERT(m_threads[m_num_threads] != 0);
  44. if (!m_threads[m_num_threads])
  45. {
  46. succeeded = false;
  47. break;
  48. }
  49. m_num_threads++;
  50. }
  51. if (!succeeded)
  52. {
  53. deinit();
  54. return false;
  55. }
  56. return true;
  57. }
  58. void task_pool::deinit()
  59. {
  60. if (m_num_threads)
  61. {
  62. join();
  63. atomic_exchange32(&m_exit_flag, true);
  64. m_tasks_available.release(m_num_threads);
  65. for (uint i = 0; i < m_num_threads; i++)
  66. {
  67. if (m_threads[i])
  68. {
  69. for ( ; ; )
  70. {
  71. DWORD result = WaitForSingleObject(m_threads[i], 30000);
  72. if ((result == WAIT_OBJECT_0) || (result == WAIT_ABANDONED))
  73. break;
  74. }
  75. CloseHandle(m_threads[i]);
  76. m_threads[i] = NULL;
  77. }
  78. }
  79. m_num_threads = 0;
  80. atomic_exchange32(&m_exit_flag, false);
  81. }
  82. m_task_stack.clear();
  83. m_num_outstanding_tasks = 0;
  84. }
  85. bool task_pool::queue_task(task_callback_func pFunc, uint64 data, void* pData_ptr)
  86. {
  87. LZHAM_ASSERT(m_num_threads);
  88. LZHAM_ASSERT(pFunc);
  89. task tsk;
  90. tsk.m_callback = pFunc;
  91. tsk.m_data = data;
  92. tsk.m_pData_ptr = pData_ptr;
  93. tsk.m_flags = 0;
  94. if (!m_task_stack.try_push(tsk))
  95. return false;
  96. atomic_increment32(&m_num_outstanding_tasks);
  97. m_tasks_available.release(1);
  98. return true;
  99. }
  100. // It's the object's responsibility to delete pObj within the execute_task() method, if needed!
  101. bool task_pool::queue_task(executable_task* pObj, uint64 data, void* pData_ptr)
  102. {
  103. LZHAM_ASSERT(m_num_threads);
  104. LZHAM_ASSERT(pObj);
  105. task tsk;
  106. tsk.m_pObj = pObj;
  107. tsk.m_data = data;
  108. tsk.m_pData_ptr = pData_ptr;
  109. tsk.m_flags = cTaskFlagObject;
  110. if (!m_task_stack.try_push(tsk))
  111. return false;
  112. atomic_increment32(&m_num_outstanding_tasks);
  113. m_tasks_available.release(1);
  114. return true;
  115. }
  116. void task_pool::process_task(task& tsk)
  117. {
  118. if (tsk.m_flags & cTaskFlagObject)
  119. tsk.m_pObj->execute_task(tsk.m_data, tsk.m_pData_ptr);
  120. else
  121. tsk.m_callback(tsk.m_data, tsk.m_pData_ptr);
  122. atomic_decrement32(&m_num_outstanding_tasks);
  123. }
  124. void task_pool::join()
  125. {
  126. while (atomic_add32(&m_num_outstanding_tasks, 0) > 0)
  127. {
  128. task tsk;
  129. if (m_task_stack.pop(tsk))
  130. {
  131. process_task(tsk);
  132. }
  133. else
  134. {
  135. lzham_sleep(1);
  136. }
  137. }
  138. }
  139. unsigned __stdcall task_pool::thread_func(void* pContext)
  140. {
  141. task_pool* pPool = static_cast<task_pool*>(pContext);
  142. for ( ; ; )
  143. {
  144. if (!pPool->m_tasks_available.wait())
  145. break;
  146. if (pPool->m_exit_flag)
  147. break;
  148. task tsk;
  149. if (pPool->m_task_stack.pop(tsk))
  150. {
  151. pPool->process_task(tsk);
  152. }
  153. }
  154. _endthreadex(0);
  155. return 0;
  156. }
  157. static uint g_num_processors;
  158. uint lzham_get_max_helper_threads()
  159. {
  160. if (!g_num_processors)
  161. {
  162. SYSTEM_INFO system_info;
  163. GetSystemInfo(&system_info);
  164. g_num_processors = system_info.dwNumberOfProcessors;
  165. }
  166. if (g_num_processors > 1)
  167. {
  168. // use all CPU's
  169. return LZHAM_MIN(task_pool::cMaxThreads, g_num_processors - 1);
  170. }
  171. return 0;
  172. }
  173. } // namespace lzham
  174. #endif // LZHAM_USE_WIN32_API