package thread import "core:intrinsics" import "core:sync" import "core:mem" Task_Status :: enum i32 { Ready, Busy, Waiting, Term, } Task_Proc :: #type proc(task: ^Task) Task :: struct { procedure: Task_Proc, data: rawptr, user_index: int, } Task_Id :: distinct i32 INVALID_TASK_ID :: Task_Id(-1) Pool :: struct { allocator: mem.Allocator, mutex: sync.Mutex, sem_available: sync.Semaphore, processing_task_count: int, // atomic is_running: bool, threads: []^Thread, tasks: [dynamic]Task, } pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) { worker_thread_internal :: proc(t: ^Thread) { pool := (^Pool)(t.data) for pool.is_running { sync.semaphore_wait_for(&pool.sem_available) if task, ok := pool_try_and_pop_task(pool); ok { pool_do_work(pool, &task) } } sync.semaphore_post(&pool.sem_available, 1) } context.allocator = allocator pool.allocator = allocator pool.tasks = make([dynamic]Task) pool.threads = make([]^Thread, thread_count) sync.mutex_init(&pool.mutex) sync.semaphore_init(&pool.sem_available) pool.is_running = true for _, i in pool.threads { t := create(worker_thread_internal) t.user_index = i t.data = pool pool.threads[i] = t } } pool_destroy :: proc(pool: ^Pool) { delete(pool.tasks) for thread in &pool.threads { destroy(thread) } delete(pool.threads, pool.allocator) sync.mutex_destroy(&pool.mutex) sync.semaphore_destroy(&pool.sem_available) } pool_start :: proc(pool: ^Pool) { for t in pool.threads { start(t) } } pool_join :: proc(pool: ^Pool) { pool.is_running = false sync.semaphore_post(&pool.sem_available, len(pool.threads)) yield() for t in pool.threads { join(t) } } pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) { sync.mutex_lock(&pool.mutex) defer sync.mutex_unlock(&pool.mutex) task: Task task.procedure = procedure task.data = data task.user_index = user_index append(&pool.tasks, task) sync.semaphore_post(&pool.sem_available, 1) } pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) { if sync.mutex_try_lock(&pool.mutex) { if len(pool.tasks) != 0 { intrinsics.atomic_add(&pool.processing_task_count, 1) task = pop_front(&pool.tasks) got_task = true } sync.mutex_unlock(&pool.mutex) } return } pool_do_work :: proc(pool: ^Pool, task: ^Task) { task.procedure(task) intrinsics.atomic_sub(&pool.processing_task_count, 1) } pool_wait_and_process :: proc(pool: ^Pool) { for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 { if task, ok := pool_try_and_pop_task(pool); ok { pool_do_work(pool, &task) } // Safety kick if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 { sync.mutex_lock(&pool.mutex) sync.semaphore_post(&pool.sem_available, len(pool.tasks)) sync.mutex_unlock(&pool.mutex) } yield() } pool_join(pool) }