thread_pool.odin 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package thread
  2. import "intrinsics"
  3. import "core:sync"
  4. import "core:mem"
  5. Task_Status :: enum i32 {
  6. Ready,
  7. Busy,
  8. Waiting,
  9. Term,
  10. }
  11. Task_Proc :: #type proc(task: ^Task);
  12. Task :: struct {
  13. procedure: Task_Proc,
  14. data: rawptr,
  15. user_index: int,
  16. }
  17. Task_Id :: distinct i32;
  18. INVALID_TASK_ID :: Task_Id(-1);
  19. Pool :: struct {
  20. allocator: mem.Allocator,
  21. mutex: sync.Mutex,
  22. sem_available: sync.Semaphore,
  23. processing_task_count: int, // atomic
  24. is_running: bool,
  25. threads: []^Thread,
  26. tasks: [dynamic]Task,
  27. }
  28. pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
  29. worker_thread_internal :: proc(t: ^Thread) {
  30. pool := (^Pool)(t.data);
  31. for pool.is_running {
  32. sync.semaphore_wait_for(&pool.sem_available);
  33. if task, ok := pool_try_and_pop_task(pool); ok {
  34. pool_do_work(pool, &task);
  35. }
  36. }
  37. sync.semaphore_post(&pool.sem_available, 1);
  38. }
  39. context.allocator = allocator;
  40. pool.allocator = allocator;
  41. pool.tasks = make([dynamic]Task);
  42. pool.threads = make([]^Thread, thread_count);
  43. sync.mutex_init(&pool.mutex);
  44. sync.semaphore_init(&pool.sem_available);
  45. pool.is_running = true;
  46. for _, i in pool.threads {
  47. t := create(worker_thread_internal);
  48. t.user_index = i;
  49. t.data = pool;
  50. pool.threads[i] = t;
  51. }
  52. }
  53. pool_destroy :: proc(pool: ^Pool) {
  54. delete(pool.tasks);
  55. for thread in &pool.threads {
  56. destroy(thread);
  57. }
  58. delete(pool.threads, pool.allocator);
  59. sync.mutex_destroy(&pool.mutex);
  60. sync.semaphore_destroy(&pool.sem_available);
  61. }
  62. pool_start :: proc(pool: ^Pool) {
  63. for t in pool.threads {
  64. start(t);
  65. }
  66. }
  67. pool_join :: proc(pool: ^Pool) {
  68. pool.is_running = false;
  69. sync.semaphore_post(&pool.sem_available, len(pool.threads));
  70. yield();
  71. for t in pool.threads {
  72. join(t);
  73. }
  74. }
  75. pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
  76. sync.mutex_lock(&pool.mutex);
  77. defer sync.mutex_unlock(&pool.mutex);
  78. task: Task;
  79. task.procedure = procedure;
  80. task.data = data;
  81. task.user_index = user_index;
  82. append(&pool.tasks, task);
  83. sync.semaphore_post(&pool.sem_available, 1);
  84. }
  85. pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
  86. if sync.mutex_try_lock(&pool.mutex) {
  87. if len(pool.tasks) != 0 {
  88. intrinsics.atomic_add(&pool.processing_task_count, 1);
  89. task = pop_front(&pool.tasks);
  90. got_task = true;
  91. }
  92. sync.mutex_unlock(&pool.mutex);
  93. }
  94. return;
  95. }
  96. pool_do_work :: proc(pool: ^Pool, task: ^Task) {
  97. task.procedure(task);
  98. intrinsics.atomic_sub(&pool.processing_task_count, 1);
  99. }
  100. pool_wait_and_process :: proc(pool: ^Pool) {
  101. for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
  102. if task, ok := pool_try_and_pop_task(pool); ok {
  103. pool_do_work(pool, &task);
  104. }
  105. // Safety kick
  106. if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
  107. sync.mutex_lock(&pool.mutex);
  108. sync.semaphore_post(&pool.sem_available, len(pool.tasks));
  109. sync.mutex_unlock(&pool.mutex);
  110. }
  111. yield();
  112. }
  113. pool_join(pool);
  114. }