Browse Source

Update Thread Pool in `core:thread`

Thanks to the work of eisbehr
gingerBill 3 years ago
parent
commit
22b961ea53
2 changed files with 171 additions and 99 deletions
  1. 148 76
      core/thread/thread_pool.odin
  2. 23 23
      examples/demo/demo.odin

+ 148 - 76
core/thread/thread_pool.odin

@@ -1,65 +1,75 @@
 package thread
 package thread
 
 
+/*
+	thread.Pool
+	Copyright 2022 eisbehr
+	Made available under Odin's BSD-3 license.
+*/
+
 import "core:intrinsics"
 import "core:intrinsics"
 import "core:sync"
 import "core:sync"
 import "core:mem"
 import "core:mem"
 
 
-Task_Status :: enum i32 {
-	Ready,
-	Busy,
-	Waiting,
-	Term,
-}
-
-Task_Proc :: #type proc(task: ^Task)
+Task_Proc :: #type proc(task: Task)
 
 
 Task :: struct {
 Task :: struct {
-	procedure: Task_Proc,
-	data: rawptr,
+	procedure:  Task_Proc,
+	data:       rawptr,
 	user_index: int,
 	user_index: int,
+	allocator:  mem.Allocator,
 }
 }
 
 
-Task_Id :: distinct i32
-INVALID_TASK_ID :: Task_Id(-1)
+// Do not access the pool's members directly while the pool threads are running,
+// since they use different kinds of locking and mutual exclusion devices.
+// Careless access can and will lead to nasty bugs. Once initialized, the
+// pool's memory address is not allowed to change until it is destroyed.
+Pool :: struct {
+	allocator:     mem.Allocator,
+	mutex:         sync.Mutex,
+	sem_available: sync.Sema,
 
 
+	// the following values are atomic
+	num_waiting:       int,
+	num_in_processing: int,
+	num_outstanding:   int, // num_waiting + num_in_processing
+	num_done:          int,
+	// end of atomics
 
 
-Pool :: struct {
-	allocator:             mem.Allocator,
-	mutex:                 sync.Mutex,
-	sem_available:         sync.Sema,
-	processing_task_count: int, // atomic
-	is_running:            bool,
+	is_running: bool,
 
 
 	threads: []^Thread,
 	threads: []^Thread,
 
 
-	tasks: [dynamic]Task,
+	tasks:      [dynamic]Task,
+	tasks_done: [dynamic]Task,
 }
 }
 
 
-pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
-	worker_thread_internal :: proc(t: ^Thread) {
-		pool := (^Pool)(t.data)
-
-		for pool.is_running {
-			sync.sema_wait(&pool.sem_available)
-
-			if task, ok := pool_try_and_pop_task(pool); ok {
-				pool_do_work(pool, &task)
-			}
-		}
-
-		sync.sema_post(&pool.sem_available, 1)
-	}
-
-
+// Once initialized, the pool's memory address is not allowed to change until
+// it is destroyed. If thread_count < 1, thread count 1 will be used.
+//
+// The thread pool requires an allocator which it either owns, or which is thread safe.
+pool_init :: proc(pool: ^Pool, thread_count: int, allocator: mem.Allocator) {
 	context.allocator = allocator
 	context.allocator = allocator
 	pool.allocator = allocator
 	pool.allocator = allocator
-	pool.tasks = make([dynamic]Task)
-	pool.threads = make([]^Thread, thread_count)
+	pool.tasks      = make([dynamic]Task)
+	pool.tasks_done = make([dynamic]Task)
+	pool.threads    = make([]^Thread, max(thread_count, 1))
 
 
 	pool.is_running = true
 	pool.is_running = true
 
 
 	for _, i in pool.threads {
 	for _, i in pool.threads {
-		t := create(worker_thread_internal)
+		t := create(proc(t: ^Thread) {
+			pool := (^Pool)(t.data)
+
+			for intrinsics.atomic_load(&pool.is_running) {
+				sync.wait(&pool.sem_available)
+
+				if task, ok := pool_pop_waiting(pool); ok {
+					pool_do_work(pool, task)
+				}
+			}
+
+			sync.post(&pool.sem_available, 1)
+		})
 		t.user_index = i
 		t.user_index = i
 		t.data = pool
 		t.data = pool
 		pool.threads[i] = t
 		pool.threads[i] = t
@@ -68,9 +78,10 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator
 
 
 pool_destroy :: proc(pool: ^Pool) {
 pool_destroy :: proc(pool: ^Pool) {
 	delete(pool.tasks)
 	delete(pool.tasks)
+	delete(pool.tasks_done)
 
 
-	for thread in &pool.threads {
-		destroy(thread)
+	for t in &pool.threads {
+		destroy(t)
 	}
 	}
 
 
 	delete(pool.threads, pool.allocator)
 	delete(pool.threads, pool.allocator)
@@ -82,10 +93,12 @@ pool_start :: proc(pool: ^Pool) {
 	}
 	}
 }
 }
 
 
+// Finish tasks that have already started processing, then shut down all pool
+// threads. Might leave over waiting tasks, any memory allocated for the
+// user data of those tasks will not be freed.
 pool_join :: proc(pool: ^Pool) {
 pool_join :: proc(pool: ^Pool) {
-	pool.is_running = false
-
-	sync.sema_post(&pool.sem_available, len(pool.threads))
+	intrinsics.atomic_store(&pool.is_running, false)
+	sync.post(&pool.sem_available, len(pool.threads))
 
 
 	yield()
 	yield()
 
 
@@ -94,53 +107,112 @@ pool_join :: proc(pool: ^Pool) {
 	}
 	}
 }
 }
 
 
-pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
-	sync.mutex_lock(&pool.mutex)
-	defer sync.mutex_unlock(&pool.mutex)
+// Add a task to the thread pool.
+//
+// Tasks can be added from any thread, not just the thread that created
+// the thread pool. You can even add tasks from inside other tasks.
+//
+// Each task also needs an allocator which it either owns, or which is thread
+// safe. By default, allocations in the task are disabled by use of the
+// nil_allocator.
+pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0, allocator := context.allocator) {
+	sync.guard(&pool.mutex)
+
+	append(&pool.tasks, Task{
+		procedure  = procedure,
+		data       = data,
+		user_index = user_index,
+		allocator  = allocator,
+	})
+	intrinsics.atomic_add(&pool.num_waiting, 1)
+	intrinsics.atomic_add(&pool.num_outstanding, 1)
+	sync.post(&pool.sem_available, 1)
+}
+
+// Number of tasks waiting to be processed. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
+	return intrinsics.atomic_load(&pool.num_waiting)
+}
+
+// Number of tasks currently being processed. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
+	return intrinsics.atomic_load(&pool.num_in_processing)
+}
+
+// Outstanding tasks are all tasks that are not done, that is, tasks that are
+// waiting, as well as tasks that are currently being processed. Only
+// informational, mostly for debugging. Don't rely on this value being
+// consistent with other num_* values.
+pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
+	return intrinsics.atomic_load(&pool.num_outstanding)
+}
 
 
-	task: Task
-	task.procedure = procedure
-	task.data = data
-	task.user_index = user_index
+// Number of tasks which are done processing. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
+	return intrinsics.atomic_load(&pool.num_done)
+}
 
 
-	append(&pool.tasks, task)
-	sync.sema_post(&pool.sem_available, 1)
+// If tasks are only being added from one thread, and this procedure is being
+// called from that same thread, it will reliably tell if the thread pool is
+// empty or not. Empty in this case means there are no tasks waiting, being
+// processed, or _done_.
+pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
+	return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
 }
 }
 
 
-pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
-	if sync.mutex_try_lock(&pool.mutex) {
-		if len(pool.tasks) != 0 {
-			intrinsics.atomic_add(&pool.processing_task_count, 1)
-			task = pop_front(&pool.tasks)
-			got_task = true
-		}
-		sync.mutex_unlock(&pool.mutex)
+// Mostly for internal use.
+pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
+	sync.guard(&pool.mutex)
+
+	if len(pool.tasks) != 0 {
+		intrinsics.atomic_sub(&pool.num_waiting, 1)
+		intrinsics.atomic_add(&pool.num_in_processing, 1)
+		task = pop_front(&pool.tasks)
+		got_task = true
 	}
 	}
+
 	return
 	return
 }
 }
 
 
+// Use this to take out finished tasks.
+pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
+	sync.guard(&pool.mutex)
+
+	if len(pool.tasks_done) != 0 {
+		task = pop_front(&pool.tasks_done)
+		got_task = true
+		intrinsics.atomic_sub(&pool.num_done, 1)
+	}
 
 
-pool_do_work :: proc(pool: ^Pool, task: ^Task) {
-	task.procedure(task)
-	intrinsics.atomic_sub(&pool.processing_task_count, 1)
+	return
 }
 }
 
 
+// Mostly for internal use.
+pool_do_work :: proc(pool: ^Pool, task: Task) {
+	{
+		context.allocator = task.allocator
+		task.procedure(task)
+	}
 
 
-pool_wait_and_process :: proc(pool: ^Pool) {
-	for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
-		if task, ok := pool_try_and_pop_task(pool); ok {
-			pool_do_work(pool, &task)
-		}
+	sync.guard(&pool.mutex)
 
 
-		// Safety kick
-		if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
-			sync.mutex_lock(&pool.mutex)
-			sync.sema_post(&pool.sem_available, len(pool.tasks))
-			sync.mutex_unlock(&pool.mutex)
-		}
+	append(&pool.tasks_done, task)
+	intrinsics.atomic_add(&pool.num_done, 1)
+	intrinsics.atomic_sub(&pool.num_outstanding, 1)
+	intrinsics.atomic_sub(&pool.num_in_processing, 1)
+}
 
 
-		yield()
+// Process the rest of the tasks, also use this thread for processing, then join
+// all the pool threads.
+pool_finish :: proc(pool: ^Pool) {
+	for task in pool_pop_waiting(pool) {
+		pool_do_work(pool, task)
 	}
 	}
-
 	pool_join(pool)
 	pool_join(pool)
 }
 }

+ 23 - 23
examples/demo/demo.odin

@@ -1145,29 +1145,29 @@ threading_example :: proc() {
 		}
 		}
 	}
 	}
 
 
-	// { // Thread Pool
-	// 	fmt.println("\n## Thread Pool")
-	// 	task_proc :: proc(t: ^thread.Task) {
-	// 		index := t.user_index % len(prefix_table)
-	// 		for iteration in 1..=5 {
-	// 			fmt.printf("Worker Task %d is on iteration %d\n", t.user_index, iteration)
-	// 			fmt.printf("`%s`: iteration %d\n", prefix_table[index], iteration)
-	// 			time.sleep(1 * time.Millisecond)
-	// 		}
-	// 	}
-
-	// 	pool: thread.Pool
-	// 	thread.pool_init(pool=&pool, thread_count=3)
-	// 	defer thread.pool_destroy(&pool)
-
-
-	// 	for i in 0..<30 {
-	// 		thread.pool_add_task(pool=&pool, procedure=task_proc, data=nil, user_index=i)
-	// 	}
-
-	// 	thread.pool_start(&pool)
-	// 	thread.pool_wait_and_process(&pool)
-	// }
+	{ // Thread Pool
+		fmt.println("\n## Thread Pool")
+		task_proc :: proc(t: thread.Task) {
+			index := t.user_index % len(prefix_table)
+			for iteration in 1..=5 {
+				fmt.printf("Worker Task %d is on iteration %d\n", t.user_index, iteration)
+				fmt.printf("`%s`: iteration %d\n", prefix_table[index], iteration)
+				time.sleep(1 * time.Millisecond)
+			}
+		}
+
+		pool: thread.Pool
+		thread.pool_init(pool=&pool, thread_count=3, allocator=context.allocator)
+		defer thread.pool_destroy(&pool)
+
+
+		for i in 0..<30 {
+			thread.pool_add_task(pool=&pool, procedure=task_proc, data=nil, user_index=i)
+		}
+
+		thread.pool_start(&pool)
+		thread.pool_finish(&pool)
+	}
 }
 }