thread_pool.odin 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package thread
  2. import "intrinsics"
  3. import "core:sync"
  4. import "core:mem"
  5. Task_Status :: enum i32 {
  6. Ready,
  7. Busy,
  8. Waiting,
  9. Term,
  10. }
  11. Task_Proc :: #type proc(task: ^Task);
  12. Task :: struct {
  13. procedure: Task_Proc,
  14. data: rawptr,
  15. user_index: int,
  16. }
  17. Task_Id :: distinct i32;
  18. INVALID_TASK_ID :: Task_Id(-1);
  19. Pool :: struct {
  20. allocator: mem.Allocator,
  21. mutex: sync.Mutex,
  22. sem_available: sync.Semaphore,
  23. processing_task_count: int, // atomic
  24. is_running: bool,
  25. threads: []^Thread,
  26. tasks: [dynamic]Task,
  27. }
  28. pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
  29. worker_thread_internal :: proc(t: ^Thread) {
  30. pool := (^Pool)(t.data);
  31. for pool.is_running {
  32. sync.semaphore_wait_for(&pool.sem_available);
  33. if task, ok := pool_try_and_pop_task(pool); ok {
  34. pool_do_work(pool, &task);
  35. }
  36. }
  37. sync.semaphore_post(&pool.sem_available, 1);
  38. }
  39. context.allocator = allocator;
  40. pool.allocator = allocator;
  41. pool.tasks = make([dynamic]Task);
  42. pool.threads = make([]^Thread, thread_count);
  43. sync.mutex_init(&pool.mutex);
  44. sync.semaphore_init(&pool.sem_available);
  45. pool.is_running = true;
  46. for _, i in pool.threads {
  47. t := create(worker_thread_internal);
  48. t.user_index = i;
  49. t.data = pool;
  50. pool.threads[i] = t;
  51. }
  52. }
  53. pool_destroy :: proc(pool: ^Pool) {
  54. delete(pool.tasks);
  55. delete(pool.threads, pool.allocator);
  56. sync.mutex_destroy(&pool.mutex);
  57. sync.semaphore_destroy(&pool.sem_available);
  58. }
  59. pool_start :: proc(pool: ^Pool) {
  60. for t in pool.threads {
  61. start(t);
  62. }
  63. }
  64. pool_join :: proc(pool: ^Pool) {
  65. pool.is_running = false;
  66. sync.semaphore_post(&pool.sem_available, len(pool.threads));
  67. yield();
  68. for t in pool.threads {
  69. join(t);
  70. }
  71. }
  72. pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
  73. sync.mutex_lock(&pool.mutex);
  74. defer sync.mutex_unlock(&pool.mutex);
  75. task: Task;
  76. task.procedure = procedure;
  77. task.data = data;
  78. task.user_index = user_index;
  79. append(&pool.tasks, task);
  80. sync.semaphore_post(&pool.sem_available, 1);
  81. }
  82. pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
  83. if sync.mutex_try_lock(&pool.mutex) {
  84. if len(pool.tasks) != 0 {
  85. intrinsics.atomic_add(&pool.processing_task_count, 1);
  86. task = pool.tasks[0];
  87. got_task = true;
  88. ordered_remove(&pool.tasks, 0);
  89. }
  90. sync.mutex_unlock(&pool.mutex);
  91. }
  92. return;
  93. }
  94. pool_do_work :: proc(pool: ^Pool, task: ^Task) {
  95. task.procedure(task);
  96. intrinsics.atomic_sub(&pool.processing_task_count, 1);
  97. }
  98. pool_wait_and_process :: proc(pool: ^Pool) {
  99. for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
  100. if task, ok := pool_try_and_pop_task(pool); ok {
  101. pool_do_work(pool, &task);
  102. }
  103. // Safety kick
  104. if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
  105. sync.mutex_lock(&pool.mutex);
  106. sync.semaphore_post(&pool.sem_available, len(pool.tasks));
  107. sync.mutex_unlock(&pool.mutex);
  108. }
  109. yield();
  110. }
  111. pool_join(pool);
  112. }