thread_pool.odin 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package thread
  2. /*
  3. thread.Pool
  4. Copyright 2022 eisbehr
  5. Made available under Odin's BSD-3 license.
  6. */
  7. import "base:intrinsics"
  8. import "core:sync"
  9. import "core:mem"
  10. Task_Proc :: #type proc(task: Task)
  11. Task :: struct {
  12. procedure: Task_Proc,
  13. data: rawptr,
  14. user_index: int,
  15. allocator: mem.Allocator,
  16. }
  17. // Do not access the pool's members directly while the pool threads are running,
  18. // since they use different kinds of locking and mutual exclusion devices.
  19. // Careless access can and will lead to nasty bugs. Once initialized, the
  20. // pool's memory address is not allowed to change until it is destroyed.
  21. Pool :: struct {
  22. allocator: mem.Allocator,
  23. mutex: sync.Mutex,
  24. sem_available: sync.Sema,
  25. // the following values are atomic
  26. num_waiting: int,
  27. num_in_processing: int,
  28. num_outstanding: int, // num_waiting + num_in_processing
  29. num_done: int,
  30. // end of atomics
  31. is_running: bool,
  32. threads: []^Thread,
  33. tasks: [dynamic]Task,
  34. tasks_done: [dynamic]Task,
  35. }
  36. Pool_Thread_Data :: struct {
  37. pool: ^Pool,
  38. task: Task,
  39. }
  40. @(private="file")
  41. pool_thread_runner :: proc(t: ^Thread) {
  42. data := cast(^Pool_Thread_Data)t.data
  43. pool := data.pool
  44. for intrinsics.atomic_load(&pool.is_running) {
  45. sync.wait(&pool.sem_available)
  46. if task, ok := pool_pop_waiting(pool); ok {
  47. data.task = task
  48. pool_do_work(pool, task)
  49. sync.guard(&pool.mutex)
  50. data.task = {}
  51. }
  52. }
  53. sync.post(&pool.sem_available, 1)
  54. }
  55. // Once initialized, the pool's memory address is not allowed to change until
  56. // it is destroyed.
  57. //
  58. // The thread pool requires an allocator which it either owns, or which is thread safe.
  59. pool_init :: proc(pool: ^Pool, allocator: mem.Allocator, thread_count: int) {
  60. context.allocator = allocator
  61. pool.allocator = allocator
  62. pool.tasks = make([dynamic]Task)
  63. pool.tasks_done = make([dynamic]Task)
  64. pool.threads = make([]^Thread, max(thread_count, 1))
  65. pool.is_running = true
  66. for _, i in pool.threads {
  67. t := create(pool_thread_runner)
  68. data := new(Pool_Thread_Data)
  69. data.pool = pool
  70. t.user_index = i
  71. t.data = data
  72. pool.threads[i] = t
  73. }
  74. }
  75. pool_destroy :: proc(pool: ^Pool) {
  76. delete(pool.tasks)
  77. delete(pool.tasks_done)
  78. for &t in pool.threads {
  79. data := cast(^Pool_Thread_Data)t.data
  80. free(data, pool.allocator)
  81. destroy(t)
  82. }
  83. delete(pool.threads, pool.allocator)
  84. }
  85. pool_start :: proc(pool: ^Pool) {
  86. for t in pool.threads {
  87. start(t)
  88. }
  89. }
  90. // Finish tasks that have already started processing, then shut down all pool
  91. // threads. Might leave over waiting tasks, any memory allocated for the
  92. // user data of those tasks will not be freed.
  93. pool_join :: proc(pool: ^Pool) {
  94. intrinsics.atomic_store(&pool.is_running, false)
  95. sync.post(&pool.sem_available, len(pool.threads))
  96. yield()
  97. started_count: int
  98. for started_count < len(pool.threads) {
  99. started_count = 0
  100. for t in pool.threads {
  101. flags := intrinsics.atomic_load(&t.flags)
  102. if .Started in flags {
  103. started_count += 1
  104. if .Joined not_in flags {
  105. join(t)
  106. }
  107. }
  108. }
  109. }
  110. }
  111. // Add a task to the thread pool.
  112. //
  113. // Tasks can be added from any thread, not just the thread that created
  114. // the thread pool. You can even add tasks from inside other tasks.
  115. //
  116. // Each task also needs an allocator which it either owns, or which is thread
  117. // safe.
  118. pool_add_task :: proc(pool: ^Pool, allocator: mem.Allocator, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
  119. sync.guard(&pool.mutex)
  120. append(&pool.tasks, Task{
  121. procedure = procedure,
  122. data = data,
  123. user_index = user_index,
  124. allocator = allocator,
  125. })
  126. intrinsics.atomic_add(&pool.num_waiting, 1)
  127. intrinsics.atomic_add(&pool.num_outstanding, 1)
  128. sync.post(&pool.sem_available, 1)
  129. }
  130. // Forcibly stop a running task by its user index.
  131. //
  132. // This will terminate the underlying thread. Ideally, you should use some
  133. // means of communication to stop a task, as thread termination may leave
  134. // resources unclaimed.
  135. //
  136. // The thread will be restarted to accept new tasks.
  137. //
  138. // Returns true if the task was found and terminated.
  139. pool_stop_task :: proc(pool: ^Pool, user_index: int, exit_code: int = 1) -> bool {
  140. sync.guard(&pool.mutex)
  141. for t, i in pool.threads {
  142. data := cast(^Pool_Thread_Data)t.data
  143. if data.task.user_index == user_index && data.task.procedure != nil {
  144. terminate(t, exit_code)
  145. append(&pool.tasks_done, data.task)
  146. intrinsics.atomic_add(&pool.num_done, 1)
  147. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  148. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  149. old_thread_user_index := t.user_index
  150. destroy(t)
  151. replacement := create(pool_thread_runner)
  152. replacement.user_index = old_thread_user_index
  153. replacement.data = data
  154. data.task = {}
  155. pool.threads[i] = replacement
  156. start(replacement)
  157. return true
  158. }
  159. }
  160. return false
  161. }
  162. // Forcibly stop all running tasks.
  163. //
  164. // The same notes from `pool_stop_task` apply here.
  165. pool_stop_all_tasks :: proc(pool: ^Pool, exit_code: int = 1) {
  166. sync.guard(&pool.mutex)
  167. for t, i in pool.threads {
  168. data := cast(^Pool_Thread_Data)t.data
  169. if data.task.procedure != nil {
  170. terminate(t, exit_code)
  171. append(&pool.tasks_done, data.task)
  172. intrinsics.atomic_add(&pool.num_done, 1)
  173. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  174. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  175. old_thread_user_index := t.user_index
  176. destroy(t)
  177. replacement := create(pool_thread_runner)
  178. replacement.user_index = old_thread_user_index
  179. replacement.data = data
  180. data.task = {}
  181. pool.threads[i] = replacement
  182. start(replacement)
  183. }
  184. }
  185. }
  186. // Force the pool to stop all of its threads and put it into a state where
  187. // it will no longer run any more tasks.
  188. //
  189. // The pool must still be destroyed after this.
  190. pool_shutdown :: proc(pool: ^Pool, exit_code: int = 1) {
  191. intrinsics.atomic_store(&pool.is_running, false)
  192. sync.guard(&pool.mutex)
  193. for t in pool.threads {
  194. terminate(t, exit_code)
  195. data := cast(^Pool_Thread_Data)t.data
  196. if data.task.procedure != nil {
  197. append(&pool.tasks_done, data.task)
  198. intrinsics.atomic_add(&pool.num_done, 1)
  199. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  200. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  201. }
  202. }
  203. }
  204. // Number of tasks waiting to be processed. Only informational, mostly for
  205. // debugging. Don't rely on this value being consistent with other num_*
  206. // values.
  207. pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
  208. return intrinsics.atomic_load(&pool.num_waiting)
  209. }
  210. // Number of tasks currently being processed. Only informational, mostly for
  211. // debugging. Don't rely on this value being consistent with other num_*
  212. // values.
  213. pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
  214. return intrinsics.atomic_load(&pool.num_in_processing)
  215. }
  216. // Outstanding tasks are all tasks that are not done, that is, tasks that are
  217. // waiting, as well as tasks that are currently being processed. Only
  218. // informational, mostly for debugging. Don't rely on this value being
  219. // consistent with other num_* values.
  220. pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
  221. return intrinsics.atomic_load(&pool.num_outstanding)
  222. }
  223. // Number of tasks which are done processing. Only informational, mostly for
  224. // debugging. Don't rely on this value being consistent with other num_*
  225. // values.
  226. pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
  227. return intrinsics.atomic_load(&pool.num_done)
  228. }
  229. // If tasks are only being added from one thread, and this procedure is being
  230. // called from that same thread, it will reliably tell if the thread pool is
  231. // empty or not. Empty in this case means there are no tasks waiting, being
  232. // processed, or _done_.
  233. pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
  234. return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
  235. }
  236. // Mostly for internal use.
  237. pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  238. sync.guard(&pool.mutex)
  239. if len(pool.tasks) != 0 {
  240. intrinsics.atomic_sub(&pool.num_waiting, 1)
  241. intrinsics.atomic_add(&pool.num_in_processing, 1)
  242. task = pop_front(&pool.tasks)
  243. got_task = true
  244. }
  245. return
  246. }
  247. // Use this to take out finished tasks.
  248. pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  249. sync.guard(&pool.mutex)
  250. if len(pool.tasks_done) != 0 {
  251. task = pop_front(&pool.tasks_done)
  252. got_task = true
  253. intrinsics.atomic_sub(&pool.num_done, 1)
  254. }
  255. return
  256. }
  257. // Mostly for internal use.
  258. pool_do_work :: proc(pool: ^Pool, task: Task) {
  259. {
  260. context.allocator = task.allocator
  261. task.procedure(task)
  262. }
  263. sync.guard(&pool.mutex)
  264. append(&pool.tasks_done, task)
  265. intrinsics.atomic_add(&pool.num_done, 1)
  266. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  267. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  268. }
  269. // Process the rest of the tasks, also use this thread for processing, then join
  270. // all the pool threads.
  271. pool_finish :: proc(pool: ^Pool) {
  272. for task in pool_pop_waiting(pool) {
  273. pool_do_work(pool, task)
  274. }
  275. pool_join(pool)
  276. }