thread_pool.odin 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package thread
  2. /*
  3. thread.Pool
  4. Copyright 2022 eisbehr
  5. Made available under Odin's BSD-3 license.
  6. */
  7. import "core:intrinsics"
  8. import "core:sync"
  9. import "core:mem"
  10. Task_Proc :: #type proc(task: Task)
  11. Task :: struct {
  12. procedure: Task_Proc,
  13. data: rawptr,
  14. user_index: int,
  15. allocator: mem.Allocator,
  16. }
  17. // Do not access the pool's members directly while the pool threads are running,
  18. // since they use different kinds of locking and mutual exclusion devices.
  19. // Careless access can and will lead to nasty bugs. Once initialized, the
  20. // pool's memory address is not allowed to change until it is destroyed.
  21. Pool :: struct {
  22. allocator: mem.Allocator,
  23. mutex: sync.Mutex,
  24. sem_available: sync.Sema,
  25. // the following values are atomic
  26. num_waiting: int,
  27. num_in_processing: int,
  28. num_outstanding: int, // num_waiting + num_in_processing
  29. num_done: int,
  30. // end of atomics
  31. is_running: bool,
  32. threads: []^Thread,
  33. tasks: [dynamic]Task,
  34. tasks_done: [dynamic]Task,
  35. }
  36. // Once initialized, the pool's memory address is not allowed to change until
  37. // it is destroyed. If thread_count < 1, thread count 1 will be used.
  38. //
  39. // The thread pool requires an allocator which it either owns, or which is thread safe.
  40. pool_init :: proc(pool: ^Pool, thread_count: int, allocator: mem.Allocator) {
  41. context.allocator = allocator
  42. pool.allocator = allocator
  43. pool.tasks = make([dynamic]Task)
  44. pool.tasks_done = make([dynamic]Task)
  45. pool.threads = make([]^Thread, max(thread_count, 1))
  46. pool.is_running = true
  47. for _, i in pool.threads {
  48. t := create(proc(t: ^Thread) {
  49. pool := (^Pool)(t.data)
  50. for intrinsics.atomic_load(&pool.is_running) {
  51. sync.wait(&pool.sem_available)
  52. if task, ok := pool_pop_waiting(pool); ok {
  53. pool_do_work(pool, task)
  54. }
  55. }
  56. sync.post(&pool.sem_available, 1)
  57. })
  58. t.user_index = i
  59. t.data = pool
  60. pool.threads[i] = t
  61. }
  62. }
  63. pool_destroy :: proc(pool: ^Pool) {
  64. delete(pool.tasks)
  65. delete(pool.tasks_done)
  66. for t in &pool.threads {
  67. destroy(t)
  68. }
  69. delete(pool.threads, pool.allocator)
  70. }
  71. pool_start :: proc(pool: ^Pool) {
  72. for t in pool.threads {
  73. start(t)
  74. }
  75. }
  76. // Finish tasks that have already started processing, then shut down all pool
  77. // threads. Might leave over waiting tasks, any memory allocated for the
  78. // user data of those tasks will not be freed.
  79. pool_join :: proc(pool: ^Pool) {
  80. intrinsics.atomic_store(&pool.is_running, false)
  81. sync.post(&pool.sem_available, len(pool.threads))
  82. yield()
  83. for t in pool.threads {
  84. join(t)
  85. }
  86. }
  87. // Add a task to the thread pool.
  88. //
  89. // Tasks can be added from any thread, not just the thread that created
  90. // the thread pool. You can even add tasks from inside other tasks.
  91. //
  92. // Each task also needs an allocator which it either owns, or which is thread
  93. // safe. By default, allocations in the task are disabled by use of the
  94. // nil_allocator.
  95. pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0, allocator := context.allocator) {
  96. sync.guard(&pool.mutex)
  97. append(&pool.tasks, Task{
  98. procedure = procedure,
  99. data = data,
  100. user_index = user_index,
  101. allocator = allocator,
  102. })
  103. intrinsics.atomic_add(&pool.num_waiting, 1)
  104. intrinsics.atomic_add(&pool.num_outstanding, 1)
  105. sync.post(&pool.sem_available, 1)
  106. }
  107. // Number of tasks waiting to be processed. Only informational, mostly for
  108. // debugging. Don't rely on this value being consistent with other num_*
  109. // values.
  110. pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
  111. return intrinsics.atomic_load(&pool.num_waiting)
  112. }
  113. // Number of tasks currently being processed. Only informational, mostly for
  114. // debugging. Don't rely on this value being consistent with other num_*
  115. // values.
  116. pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
  117. return intrinsics.atomic_load(&pool.num_in_processing)
  118. }
  119. // Outstanding tasks are all tasks that are not done, that is, tasks that are
  120. // waiting, as well as tasks that are currently being processed. Only
  121. // informational, mostly for debugging. Don't rely on this value being
  122. // consistent with other num_* values.
  123. pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
  124. return intrinsics.atomic_load(&pool.num_outstanding)
  125. }
  126. // Number of tasks which are done processing. Only informational, mostly for
  127. // debugging. Don't rely on this value being consistent with other num_*
  128. // values.
  129. pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
  130. return intrinsics.atomic_load(&pool.num_done)
  131. }
  132. // If tasks are only being added from one thread, and this procedure is being
  133. // called from that same thread, it will reliably tell if the thread pool is
  134. // empty or not. Empty in this case means there are no tasks waiting, being
  135. // processed, or _done_.
  136. pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
  137. return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
  138. }
  139. // Mostly for internal use.
  140. pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  141. sync.guard(&pool.mutex)
  142. if len(pool.tasks) != 0 {
  143. intrinsics.atomic_sub(&pool.num_waiting, 1)
  144. intrinsics.atomic_add(&pool.num_in_processing, 1)
  145. task = pop_front(&pool.tasks)
  146. got_task = true
  147. }
  148. return
  149. }
  150. // Use this to take out finished tasks.
  151. pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  152. sync.guard(&pool.mutex)
  153. if len(pool.tasks_done) != 0 {
  154. task = pop_front(&pool.tasks_done)
  155. got_task = true
  156. intrinsics.atomic_sub(&pool.num_done, 1)
  157. }
  158. return
  159. }
  160. // Mostly for internal use.
  161. pool_do_work :: proc(pool: ^Pool, task: Task) {
  162. {
  163. context.allocator = task.allocator
  164. task.procedure(task)
  165. }
  166. sync.guard(&pool.mutex)
  167. append(&pool.tasks_done, task)
  168. intrinsics.atomic_add(&pool.num_done, 1)
  169. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  170. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  171. }
  172. // Process the rest of the tasks, also use this thread for processing, then join
  173. // all the pool threads.
  174. pool_finish :: proc(pool: ^Pool) {
  175. for task in pool_pop_waiting(pool) {
  176. pool_do_work(pool, task)
  177. }
  178. pool_join(pool)
  179. }