2
0

thread_pool.odin 9.1 KB


  1. package thread
  2. /*
  3. thread.Pool
  4. Copyright 2022 eisbehr
  5. Made available under Odin's BSD-3 license.
  6. */
  7. import "base:intrinsics"
  8. import "core:sync"
  9. import "core:mem"
  10. import "core:container/queue"
  11. Task_Proc :: #type proc(task: Task)
  12. Task :: struct {
  13. procedure: Task_Proc,
  14. data: rawptr,
  15. user_index: int,
  16. allocator: mem.Allocator,
  17. }
  18. // Do not access the pool's members directly while the pool threads are running,
  19. // since they use different kinds of locking and mutual exclusion devices.
  20. // Careless access can and will lead to nasty bugs. Once initialized, the
  21. // pool's memory address is not allowed to change until it is destroyed.
  22. Pool :: struct {
  23. allocator: mem.Allocator,
  24. mutex: sync.Mutex,
  25. sem_available: sync.Sema,
  26. // the following values are atomic
  27. num_waiting: int,
  28. num_in_processing: int,
  29. num_outstanding: int, // num_waiting + num_in_processing
  30. num_done: int,
  31. // end of atomics
  32. is_running: bool,
  33. threads: []^Thread,
  34. tasks: queue.Queue(Task),
  35. tasks_done: [dynamic]Task,
  36. }
  37. Pool_Thread_Data :: struct {
  38. pool: ^Pool,
  39. task: Task,
  40. }
  41. @(private="file")
  42. pool_thread_runner :: proc(t: ^Thread) {
  43. data := cast(^Pool_Thread_Data)t.data
  44. pool := data.pool
  45. for intrinsics.atomic_load(&pool.is_running) {
  46. sync.wait(&pool.sem_available)
  47. if task, ok := pool_pop_waiting(pool); ok {
  48. data.task = task
  49. pool_do_work(pool, task)
  50. sync.guard(&pool.mutex)
  51. data.task = {}
  52. }
  53. }
  54. sync.post(&pool.sem_available, 1)
  55. }
  56. // Once initialized, the pool's memory address is not allowed to change until
  57. // it is destroyed.
  58. //
  59. // The thread pool requires an allocator which it either owns, or which is thread safe.
  60. pool_init :: proc(pool: ^Pool, allocator: mem.Allocator, thread_count: int) {
  61. context.allocator = allocator
  62. pool.allocator = allocator
  63. queue.init(&pool.tasks)
  64. pool.tasks_done = make([dynamic]Task)
  65. pool.threads = make([]^Thread, max(thread_count, 1))
  66. pool.is_running = true
  67. for _, i in pool.threads {
  68. t := create(pool_thread_runner)
  69. data := new(Pool_Thread_Data)
  70. data.pool = pool
  71. t.user_index = i
  72. t.data = data
  73. pool.threads[i] = t
  74. }
  75. }
  76. pool_destroy :: proc(pool: ^Pool) {
  77. queue.destroy(&pool.tasks)
  78. delete(pool.tasks_done)
  79. for &t in pool.threads {
  80. data := cast(^Pool_Thread_Data)t.data
  81. free(data, pool.allocator)
  82. destroy(t)
  83. }
  84. delete(pool.threads, pool.allocator)
  85. }
  86. pool_start :: proc(pool: ^Pool) {
  87. for t in pool.threads {
  88. start(t)
  89. }
  90. }
  91. // Finish tasks that have already started processing, then shut down all pool
  92. // threads. Might leave over waiting tasks, any memory allocated for the
  93. // user data of those tasks will not be freed.
  94. pool_join :: proc(pool: ^Pool) {
  95. intrinsics.atomic_store(&pool.is_running, false)
  96. sync.post(&pool.sem_available, len(pool.threads))
  97. yield()
  98. unstarted_count: int
  99. for t in pool.threads {
  100. flags := intrinsics.atomic_load(&t.flags)
  101. if .Started not_in flags {
  102. unstarted_count += 1
  103. }
  104. }
  105. // most likely the user forgot to call `pool_start`
  106. // exit here, so we don't hang forever
  107. if len(pool.threads) == unstarted_count {
  108. return
  109. }
  110. started_count: int
  111. for started_count < len(pool.threads) {
  112. started_count = 0
  113. for t in pool.threads {
  114. flags := intrinsics.atomic_load(&t.flags)
  115. if .Started in flags {
  116. started_count += 1
  117. if .Joined not_in flags {
  118. join(t)
  119. }
  120. }
  121. }
  122. }
  123. }
  124. // Add a task to the thread pool.
  125. //
  126. // Tasks can be added from any thread, not just the thread that created
  127. // the thread pool. You can even add tasks from inside other tasks.
  128. //
  129. // Each task also needs an allocator which it either owns, or which is thread
  130. // safe.
  131. pool_add_task :: proc(pool: ^Pool, allocator: mem.Allocator, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
  132. sync.guard(&pool.mutex)
  133. queue.push_back(&pool.tasks, Task{
  134. procedure = procedure,
  135. data = data,
  136. user_index = user_index,
  137. allocator = allocator,
  138. })
  139. intrinsics.atomic_add(&pool.num_waiting, 1)
  140. intrinsics.atomic_add(&pool.num_outstanding, 1)
  141. sync.post(&pool.sem_available, 1)
  142. }
  143. // Forcibly stop a running task by its user index.
  144. //
  145. // This will terminate the underlying thread. Ideally, you should use some
  146. // means of communication to stop a task, as thread termination may leave
  147. // resources unclaimed.
  148. //
  149. // The thread will be restarted to accept new tasks.
  150. //
  151. // Returns true if the task was found and terminated.
  152. pool_stop_task :: proc(pool: ^Pool, user_index: int, exit_code: int = 1) -> bool {
  153. sync.guard(&pool.mutex)
  154. for t, i in pool.threads {
  155. data := cast(^Pool_Thread_Data)t.data
  156. if data.task.user_index == user_index && data.task.procedure != nil {
  157. terminate(t, exit_code)
  158. append(&pool.tasks_done, data.task)
  159. intrinsics.atomic_add(&pool.num_done, 1)
  160. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  161. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  162. old_thread_user_index := t.user_index
  163. destroy(t)
  164. replacement := create(pool_thread_runner)
  165. replacement.user_index = old_thread_user_index
  166. replacement.data = data
  167. data.task = {}
  168. pool.threads[i] = replacement
  169. start(replacement)
  170. return true
  171. }
  172. }
  173. return false
  174. }
  175. // Forcibly stop all running tasks.
  176. //
  177. // The same notes from `pool_stop_task` apply here.
  178. pool_stop_all_tasks :: proc(pool: ^Pool, exit_code: int = 1) {
  179. sync.guard(&pool.mutex)
  180. for t, i in pool.threads {
  181. data := cast(^Pool_Thread_Data)t.data
  182. if data.task.procedure != nil {
  183. terminate(t, exit_code)
  184. append(&pool.tasks_done, data.task)
  185. intrinsics.atomic_add(&pool.num_done, 1)
  186. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  187. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  188. old_thread_user_index := t.user_index
  189. destroy(t)
  190. replacement := create(pool_thread_runner)
  191. replacement.user_index = old_thread_user_index
  192. replacement.data = data
  193. data.task = {}
  194. pool.threads[i] = replacement
  195. start(replacement)
  196. }
  197. }
  198. }
  199. // Force the pool to stop all of its threads and put it into a state where
  200. // it will no longer run any more tasks.
  201. //
  202. // The pool must still be destroyed after this.
  203. pool_shutdown :: proc(pool: ^Pool, exit_code: int = 1) {
  204. intrinsics.atomic_store(&pool.is_running, false)
  205. sync.guard(&pool.mutex)
  206. for t in pool.threads {
  207. terminate(t, exit_code)
  208. data := cast(^Pool_Thread_Data)t.data
  209. if data.task.procedure != nil {
  210. append(&pool.tasks_done, data.task)
  211. intrinsics.atomic_add(&pool.num_done, 1)
  212. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  213. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  214. }
  215. }
  216. }
  217. // Number of tasks waiting to be processed. Only informational, mostly for
  218. // debugging. Don't rely on this value being consistent with other num_*
  219. // values.
  220. pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
  221. return intrinsics.atomic_load(&pool.num_waiting)
  222. }
  223. // Number of tasks currently being processed. Only informational, mostly for
  224. // debugging. Don't rely on this value being consistent with other num_*
  225. // values.
  226. pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
  227. return intrinsics.atomic_load(&pool.num_in_processing)
  228. }
  229. // Outstanding tasks are all tasks that are not done, that is, tasks that are
  230. // waiting, as well as tasks that are currently being processed. Only
  231. // informational, mostly for debugging. Don't rely on this value being
  232. // consistent with other num_* values.
  233. pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
  234. return intrinsics.atomic_load(&pool.num_outstanding)
  235. }
  236. // Number of tasks which are done processing. Only informational, mostly for
  237. // debugging. Don't rely on this value being consistent with other num_*
  238. // values.
  239. pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
  240. return intrinsics.atomic_load(&pool.num_done)
  241. }
  242. // If tasks are only being added from one thread, and this procedure is being
  243. // called from that same thread, it will reliably tell if the thread pool is
  244. // empty or not. Empty in this case means there are no tasks waiting, being
  245. // processed, or _done_.
  246. pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
  247. return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
  248. }
  249. // Mostly for internal use.
  250. pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  251. sync.guard(&pool.mutex)
  252. if queue.len(pool.tasks) != 0 {
  253. intrinsics.atomic_sub(&pool.num_waiting, 1)
  254. intrinsics.atomic_add(&pool.num_in_processing, 1)
  255. task = queue.pop_front(&pool.tasks)
  256. got_task = true
  257. }
  258. return
  259. }
  260. // Use this to take out finished tasks.
  261. pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  262. sync.guard(&pool.mutex)
  263. if len(pool.tasks_done) != 0 {
  264. task = pop_front(&pool.tasks_done)
  265. got_task = true
  266. intrinsics.atomic_sub(&pool.num_done, 1)
  267. }
  268. return
  269. }
  270. // Mostly for internal use.
  271. pool_do_work :: proc(pool: ^Pool, task: Task) {
  272. {
  273. context.allocator = task.allocator
  274. task.procedure(task)
  275. }
  276. sync.guard(&pool.mutex)
  277. append(&pool.tasks_done, task)
  278. intrinsics.atomic_add(&pool.num_done, 1)
  279. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  280. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  281. }
  282. // Process the rest of the tasks, also use this thread for processing, then join
  283. // all the pool threads.
  284. pool_finish :: proc(pool: ^Pool) {
  285. for task in pool_pop_waiting(pool) {
  286. pool_do_work(pool, task)
  287. }
  288. pool_join(pool)
  289. }