thread_pool.odin 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package thread
  2. /*
  3. thread.Pool
  4. Copyright 2022 eisbehr
  5. Made available under Odin's BSD-3 license.
  6. */
  7. import "base:intrinsics"
  8. import "core:sync"
  9. import "core:mem"
  10. import "core:container/queue"
  11. Task_Proc :: #type proc(task: Task)
  12. Task :: struct {
  13. procedure: Task_Proc,
  14. data: rawptr,
  15. user_index: int,
  16. allocator: mem.Allocator,
  17. }
  18. // Do not access the pool's members directly while the pool threads are running,
  19. // since they use different kinds of locking and mutual exclusion devices.
  20. // Careless access can and will lead to nasty bugs. Once initialized, the
  21. // pool's memory address is not allowed to change until it is destroyed.
  22. Pool :: struct {
  23. allocator: mem.Allocator,
  24. mutex: sync.Mutex,
  25. sem_available: sync.Sema,
  26. // the following values are atomic
  27. num_waiting: int,
  28. num_in_processing: int,
  29. num_outstanding: int, // num_waiting + num_in_processing
  30. num_done: int,
  31. // end of atomics
  32. is_running: bool,
  33. threads: []^Thread,
  34. tasks: queue.Queue(Task),
  35. tasks_done: [dynamic]Task,
  36. }
  37. Pool_Thread_Data :: struct {
  38. pool: ^Pool,
  39. task: Task,
  40. }
  41. @(private="file")
  42. pool_thread_runner :: proc(t: ^Thread) {
  43. data := cast(^Pool_Thread_Data)t.data
  44. pool := data.pool
  45. for intrinsics.atomic_load(&pool.is_running) {
  46. sync.wait(&pool.sem_available)
  47. if task, ok := pool_pop_waiting(pool); ok {
  48. data.task = task
  49. pool_do_work(pool, task)
  50. sync.guard(&pool.mutex)
  51. data.task = {}
  52. }
  53. }
  54. sync.post(&pool.sem_available, 1)
  55. }
  56. // Once initialized, the pool's memory address is not allowed to change until
  57. // it is destroyed.
  58. //
  59. // The thread pool requires an allocator which it either owns, or which is thread safe.
  60. pool_init :: proc(pool: ^Pool, allocator: mem.Allocator, thread_count: int) {
  61. context.allocator = allocator
  62. pool.allocator = allocator
  63. queue.init(&pool.tasks)
  64. pool.tasks_done = make([dynamic]Task)
  65. pool.threads = make([]^Thread, max(thread_count, 1))
  66. pool.is_running = true
  67. for _, i in pool.threads {
  68. t := create(pool_thread_runner)
  69. data := new(Pool_Thread_Data)
  70. data.pool = pool
  71. t.user_index = i
  72. t.data = data
  73. pool.threads[i] = t
  74. }
  75. }
  76. pool_destroy :: proc(pool: ^Pool) {
  77. queue.destroy(&pool.tasks)
  78. delete(pool.tasks_done)
  79. for &t in pool.threads {
  80. data := cast(^Pool_Thread_Data)t.data
  81. free(data, pool.allocator)
  82. destroy(t)
  83. }
  84. delete(pool.threads, pool.allocator)
  85. }
  86. pool_start :: proc(pool: ^Pool) {
  87. for t in pool.threads {
  88. start(t)
  89. }
  90. }
  91. // Finish tasks that have already started processing, then shut down all pool
  92. // threads. Might leave over waiting tasks, any memory allocated for the
  93. // user data of those tasks will not be freed.
  94. pool_join :: proc(pool: ^Pool) {
  95. intrinsics.atomic_store(&pool.is_running, false)
  96. sync.post(&pool.sem_available, len(pool.threads))
  97. yield()
  98. started_count: int
  99. for started_count < len(pool.threads) {
  100. started_count = 0
  101. for t in pool.threads {
  102. flags := intrinsics.atomic_load(&t.flags)
  103. if .Started in flags {
  104. started_count += 1
  105. if .Joined not_in flags {
  106. join(t)
  107. }
  108. }
  109. }
  110. }
  111. }
  112. // Add a task to the thread pool.
  113. //
  114. // Tasks can be added from any thread, not just the thread that created
  115. // the thread pool. You can even add tasks from inside other tasks.
  116. //
  117. // Each task also needs an allocator which it either owns, or which is thread
  118. // safe.
  119. pool_add_task :: proc(pool: ^Pool, allocator: mem.Allocator, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
  120. sync.guard(&pool.mutex)
  121. queue.push_back(&pool.tasks, Task{
  122. procedure = procedure,
  123. data = data,
  124. user_index = user_index,
  125. allocator = allocator,
  126. })
  127. intrinsics.atomic_add(&pool.num_waiting, 1)
  128. intrinsics.atomic_add(&pool.num_outstanding, 1)
  129. sync.post(&pool.sem_available, 1)
  130. }
  131. // Forcibly stop a running task by its user index.
  132. //
  133. // This will terminate the underlying thread. Ideally, you should use some
  134. // means of communication to stop a task, as thread termination may leave
  135. // resources unclaimed.
  136. //
  137. // The thread will be restarted to accept new tasks.
  138. //
  139. // Returns true if the task was found and terminated.
  140. pool_stop_task :: proc(pool: ^Pool, user_index: int, exit_code: int = 1) -> bool {
  141. sync.guard(&pool.mutex)
  142. for t, i in pool.threads {
  143. data := cast(^Pool_Thread_Data)t.data
  144. if data.task.user_index == user_index && data.task.procedure != nil {
  145. terminate(t, exit_code)
  146. append(&pool.tasks_done, data.task)
  147. intrinsics.atomic_add(&pool.num_done, 1)
  148. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  149. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  150. old_thread_user_index := t.user_index
  151. destroy(t)
  152. replacement := create(pool_thread_runner)
  153. replacement.user_index = old_thread_user_index
  154. replacement.data = data
  155. data.task = {}
  156. pool.threads[i] = replacement
  157. start(replacement)
  158. return true
  159. }
  160. }
  161. return false
  162. }
  163. // Forcibly stop all running tasks.
  164. //
  165. // The same notes from `pool_stop_task` apply here.
  166. pool_stop_all_tasks :: proc(pool: ^Pool, exit_code: int = 1) {
  167. sync.guard(&pool.mutex)
  168. for t, i in pool.threads {
  169. data := cast(^Pool_Thread_Data)t.data
  170. if data.task.procedure != nil {
  171. terminate(t, exit_code)
  172. append(&pool.tasks_done, data.task)
  173. intrinsics.atomic_add(&pool.num_done, 1)
  174. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  175. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  176. old_thread_user_index := t.user_index
  177. destroy(t)
  178. replacement := create(pool_thread_runner)
  179. replacement.user_index = old_thread_user_index
  180. replacement.data = data
  181. data.task = {}
  182. pool.threads[i] = replacement
  183. start(replacement)
  184. }
  185. }
  186. }
  187. // Force the pool to stop all of its threads and put it into a state where
  188. // it will no longer run any more tasks.
  189. //
  190. // The pool must still be destroyed after this.
  191. pool_shutdown :: proc(pool: ^Pool, exit_code: int = 1) {
  192. intrinsics.atomic_store(&pool.is_running, false)
  193. sync.guard(&pool.mutex)
  194. for t in pool.threads {
  195. terminate(t, exit_code)
  196. data := cast(^Pool_Thread_Data)t.data
  197. if data.task.procedure != nil {
  198. append(&pool.tasks_done, data.task)
  199. intrinsics.atomic_add(&pool.num_done, 1)
  200. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  201. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  202. }
  203. }
  204. }
  205. // Number of tasks waiting to be processed. Only informational, mostly for
  206. // debugging. Don't rely on this value being consistent with other num_*
  207. // values.
  208. pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
  209. return intrinsics.atomic_load(&pool.num_waiting)
  210. }
  211. // Number of tasks currently being processed. Only informational, mostly for
  212. // debugging. Don't rely on this value being consistent with other num_*
  213. // values.
  214. pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
  215. return intrinsics.atomic_load(&pool.num_in_processing)
  216. }
  217. // Outstanding tasks are all tasks that are not done, that is, tasks that are
  218. // waiting, as well as tasks that are currently being processed. Only
  219. // informational, mostly for debugging. Don't rely on this value being
  220. // consistent with other num_* values.
  221. pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
  222. return intrinsics.atomic_load(&pool.num_outstanding)
  223. }
  224. // Number of tasks which are done processing. Only informational, mostly for
  225. // debugging. Don't rely on this value being consistent with other num_*
  226. // values.
  227. pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
  228. return intrinsics.atomic_load(&pool.num_done)
  229. }
  230. // If tasks are only being added from one thread, and this procedure is being
  231. // called from that same thread, it will reliably tell if the thread pool is
  232. // empty or not. Empty in this case means there are no tasks waiting, being
  233. // processed, or _done_.
  234. pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
  235. return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
  236. }
  237. // Mostly for internal use.
  238. pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  239. sync.guard(&pool.mutex)
  240. if queue.len(pool.tasks) != 0 {
  241. intrinsics.atomic_sub(&pool.num_waiting, 1)
  242. intrinsics.atomic_add(&pool.num_in_processing, 1)
  243. task = queue.pop_front(&pool.tasks)
  244. got_task = true
  245. }
  246. return
  247. }
  248. // Use this to take out finished tasks.
  249. pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
  250. sync.guard(&pool.mutex)
  251. if len(pool.tasks_done) != 0 {
  252. task = pop_front(&pool.tasks_done)
  253. got_task = true
  254. intrinsics.atomic_sub(&pool.num_done, 1)
  255. }
  256. return
  257. }
  258. // Mostly for internal use.
  259. pool_do_work :: proc(pool: ^Pool, task: Task) {
  260. {
  261. context.allocator = task.allocator
  262. task.procedure(task)
  263. }
  264. sync.guard(&pool.mutex)
  265. append(&pool.tasks_done, task)
  266. intrinsics.atomic_add(&pool.num_done, 1)
  267. intrinsics.atomic_sub(&pool.num_outstanding, 1)
  268. intrinsics.atomic_sub(&pool.num_in_processing, 1)
  269. }
  270. // Process the rest of the tasks, also use this thread for processing, then join
  271. // all the pool threads.
  272. pool_finish :: proc(pool: ^Pool) {
  273. for task in pool_pop_waiting(pool) {
  274. pool_do_work(pool, task)
  275. }
  276. pool_join(pool)
  277. }