123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package thread
- import "core:intrinsics"
- import "core:sync"
- import "core:mem"
- Task_Status :: enum i32 {
- Ready,
- Busy,
- Waiting,
- Term,
- }
- Task_Proc :: #type proc(task: ^Task)
- Task :: struct {
- procedure: Task_Proc,
- data: rawptr,
- user_index: int,
- }
- Task_Id :: distinct i32
- INVALID_TASK_ID :: Task_Id(-1)
- Pool :: struct {
- allocator: mem.Allocator,
- mutex: sync.Mutex,
- sem_available: sync.Semaphore,
- processing_task_count: int, // atomic
- is_running: bool,
- threads: []^Thread,
- tasks: [dynamic]Task,
- }
- pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
- worker_thread_internal :: proc(t: ^Thread) {
- pool := (^Pool)(t.data)
- for pool.is_running {
- sync.semaphore_wait_for(&pool.sem_available)
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task)
- }
- }
- sync.semaphore_post(&pool.sem_available, 1)
- }
- context.allocator = allocator
- pool.allocator = allocator
- pool.tasks = make([dynamic]Task)
- pool.threads = make([]^Thread, thread_count)
- sync.mutex_init(&pool.mutex)
- sync.semaphore_init(&pool.sem_available)
- pool.is_running = true
- for _, i in pool.threads {
- t := create(worker_thread_internal)
- t.user_index = i
- t.data = pool
- pool.threads[i] = t
- }
- }
- pool_destroy :: proc(pool: ^Pool) {
- delete(pool.tasks)
- for thread in &pool.threads {
- destroy(thread)
- }
- delete(pool.threads, pool.allocator)
- sync.mutex_destroy(&pool.mutex)
- sync.semaphore_destroy(&pool.sem_available)
- }
- pool_start :: proc(pool: ^Pool) {
- for t in pool.threads {
- start(t)
- }
- }
- pool_join :: proc(pool: ^Pool) {
- pool.is_running = false
- sync.semaphore_post(&pool.sem_available, len(pool.threads))
- yield()
- for t in pool.threads {
- join(t)
- }
- }
- pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
- sync.mutex_lock(&pool.mutex)
- defer sync.mutex_unlock(&pool.mutex)
- task: Task
- task.procedure = procedure
- task.data = data
- task.user_index = user_index
- append(&pool.tasks, task)
- sync.semaphore_post(&pool.sem_available, 1)
- }
- pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
- if sync.mutex_try_lock(&pool.mutex) {
- if len(pool.tasks) != 0 {
- intrinsics.atomic_add(&pool.processing_task_count, 1)
- task = pop_front(&pool.tasks)
- got_task = true
- }
- sync.mutex_unlock(&pool.mutex)
- }
- return
- }
- pool_do_work :: proc(pool: ^Pool, task: ^Task) {
- task.procedure(task)
- intrinsics.atomic_sub(&pool.processing_task_count, 1)
- }
- pool_wait_and_process :: proc(pool: ^Pool) {
- for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task)
- }
- // Safety kick
- if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
- sync.mutex_lock(&pool.mutex)
- sync.semaphore_post(&pool.sem_available, len(pool.tasks))
- sync.mutex_unlock(&pool.mutex)
- }
- yield()
- }
- pool_join(pool)
- }
|