123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package thread
- import "intrinsics"
- import "core:sync"
- import "core:mem"
- Task_Status :: enum i32 {
- Ready,
- Busy,
- Waiting,
- Term,
- }
- Task_Proc :: #type proc(task: ^Task);
- Task :: struct {
- procedure: Task_Proc,
- data: rawptr,
- user_index: int,
- }
- Task_Id :: distinct i32;
- INVALID_TASK_ID :: Task_Id(-1);
- Pool :: struct {
- allocator: mem.Allocator,
- mutex: sync.Mutex,
- sem_available: sync.Semaphore,
- processing_task_count: int, // atomic
- is_running: bool,
- threads: []^Thread,
- tasks: [dynamic]Task,
- }
- pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
- worker_thread_internal :: proc(t: ^Thread) {
- pool := (^Pool)(t.data);
- for pool.is_running {
- sync.semaphore_wait_for(&pool.sem_available);
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task);
- }
- }
- sync.semaphore_post(&pool.sem_available, 1);
- }
- context.allocator = allocator;
- pool.allocator = allocator;
- pool.tasks = make([dynamic]Task);
- pool.threads = make([]^Thread, thread_count);
- sync.mutex_init(&pool.mutex);
- sync.semaphore_init(&pool.sem_available);
- pool.is_running = true;
- for _, i in pool.threads {
- t := create(worker_thread_internal);
- t.user_index = i;
- t.data = pool;
- pool.threads[i] = t;
- }
- }
- pool_destroy :: proc(pool: ^Pool) {
- delete(pool.tasks);
- for thread in &pool.threads {
- destroy(thread);
- }
- delete(pool.threads, pool.allocator);
- sync.mutex_destroy(&pool.mutex);
- sync.semaphore_destroy(&pool.sem_available);
- }
- pool_start :: proc(pool: ^Pool) {
- for t in pool.threads {
- start(t);
- }
- }
- pool_join :: proc(pool: ^Pool) {
- pool.is_running = false;
- sync.semaphore_post(&pool.sem_available, len(pool.threads));
- yield();
- for t in pool.threads {
- join(t);
- }
- }
- pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
- sync.mutex_lock(&pool.mutex);
- defer sync.mutex_unlock(&pool.mutex);
- task: Task;
- task.procedure = procedure;
- task.data = data;
- task.user_index = user_index;
- append(&pool.tasks, task);
- sync.semaphore_post(&pool.sem_available, 1);
- }
- pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
- if sync.mutex_try_lock(&pool.mutex) {
- if len(pool.tasks) != 0 {
- intrinsics.atomic_add(&pool.processing_task_count, 1);
- task = pop_front(&pool.tasks);
- got_task = true;
- }
- sync.mutex_unlock(&pool.mutex);
- }
- return;
- }
- pool_do_work :: proc(pool: ^Pool, task: ^Task) {
- task.procedure(task);
- intrinsics.atomic_sub(&pool.processing_task_count, 1);
- }
- pool_wait_and_process :: proc(pool: ^Pool) {
- for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task);
- }
- // Safety kick
- if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
- sync.mutex_lock(&pool.mutex);
- sync.semaphore_post(&pool.sem_available, len(pool.tasks));
- sync.mutex_unlock(&pool.mutex);
- }
- yield();
- }
- pool_join(pool);
- }
|