소스 검색

Thread pool fixes

gingerBill 6 년 전
부모
커밋
c93872cc13
2개의 변경된 파일176개의 추가작업 그리고 6개의 파일을 삭제
  1. 7 6
      build.bat
  2. 169 0
      src/thread_pool.cpp

+ 7 - 6
build.bat

@@ -4,14 +4,14 @@
 set exe_name=odin.exe
 
 :: Debug = 0, Release = 1
-set release_mode=0
+set release_mode=1
 set compiler_flags= -nologo -Oi -TP -fp:precise -Gm- -MP -FC -GS- -EHsc- -GR-
 
 if %release_mode% EQU 0 ( rem Debug
-	set compiler_flags=%compiler_flags% -Od -MDd -Z7
+	set compiler_flags=%compiler_flags% -Od -MDd -Zi
 	rem -DDISPLAY_TIMING
 ) else ( rem Release
-	set compiler_flags=%compiler_flags% -O2 -MT -Z7 -DNO_ARRAY_BOUNDS_CHECK
+	set compiler_flags=%compiler_flags% -O2 -MT -Zi -DNO_ARRAY_BOUNDS_CHECK
 )
 
 set compiler_warnings= ^
@@ -40,9 +40,10 @@ del *.pdb > NUL 2> NUL
 del *.ilk > NUL 2> NUL
 
 
-cl %compiler_settings% "src\main.cpp" ^
-	/link %linker_settings% -OUT:%exe_name% ^
-	&& odin run examples/demo/demo.odin
+rem cl %compiler_settings% "src\main.cpp" ^
+	rem /link %linker_settings% -OUT:%exe_name% ^
+	rem && odin build examples/demo/demo.odin -show-timings
+odin check examples/demo/demo.odin -show-timings
 
 del *.obj > NUL 2> NUL
 

+ 169 - 0
src/thread_pool.cpp

@@ -0,0 +1,169 @@
+// worker_queue.cpp
+
+#define WORKER_TASK_PROC(name) isize name(void *data)
+typedef WORKER_TASK_PROC(WorkerTaskProc);
+
+struct WorkerTask {
+	WorkerTaskProc *do_work;
+	void *data;
+};
+
+
+struct ThreadPool {
+	gbMutex     task_mutex;
+	gbMutex     mutex;
+	gbSemaphore semaphore;
+	gbAtomic32  processing_work_count;
+	bool        is_running;
+
+	Array<WorkerTask> tasks;
+	Array<gbThread> threads;
+
+	gbAllocator original_allocator;
+
+	char worker_prefix[10];
+	i32 worker_prefix_len;
+};
+
+
+GB_ALLOCATOR_PROC(thread_pool_allocator_proc) {
+	ThreadPool *pool = cast(ThreadPool *)allocator_data;
+	return pool->original_allocator.proc(pool->original_allocator.data, type, size, 256, old_memory, old_size, flags);
+}
+
+gbAllocator thread_pool_allocator(ThreadPool *pool) {
+	gbAllocator a = {thread_pool_allocator_proc, pool};
+	return a;
+}
+
+void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix = nullptr);
+void thread_pool_destroy(ThreadPool *pool);
+void thread_pool_start(ThreadPool *pool);
+void thread_pool_join(ThreadPool *pool);
+void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
+void thread_pool_kick(ThreadPool *pool);
+void thread_pool_kick_and_wait(ThreadPool *pool);
+GB_THREAD_PROC(worker_thread_internal);
+
+void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) {
+	pool->original_allocator = a;
+	gbAllocator tpa = thread_pool_allocator(pool);
+	pool->tasks = array_make<WorkerTask>(tpa, 0, 1024);
+	pool->threads = array_make<gbThread>(tpa, thread_count);
+	gb_mutex_init(&pool->task_mutex);
+	gb_mutex_init(&pool->mutex);
+	gb_semaphore_init(&pool->semaphore);
+	pool->is_running = true;
+
+	pool->worker_prefix_len = 0;
+	if (worker_prefix) {
+		i32 worker_prefix_len = cast(i32)gb_strlen(worker_prefix);
+		worker_prefix_len = gb_min(worker_prefix_len, 10);
+		gb_memmove(pool->worker_prefix, worker_prefix, worker_prefix_len);
+		pool->worker_prefix_len = worker_prefix_len;
+	}
+
+	for_array(i, pool->threads) {
+		gbThread *t = &pool->threads[i];
+		gb_thread_init(t);
+		t->user_index = i;
+		if (pool->worker_prefix_len > 0) {
+			char worker_name[16] = {};
+			gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i);
+			gb_thread_set_name(t, worker_name);
+		}
+	}
+}
+
+void thread_pool_start(ThreadPool *pool) {
+	for_array(i, pool->threads) {
+		gbThread *t = &pool->threads[i];
+		gb_thread_start(t, worker_thread_internal, pool);
+	}
+}
+
+void thread_pool_join(ThreadPool *pool) {
+	pool->is_running = false;
+
+	for_array(i, pool->threads) {
+		gb_semaphore_release(&pool->semaphore);
+	}
+
+	for_array(i, pool->threads) {
+		gbThread *t = &pool->threads[i];
+		gb_thread_join(t);
+	}
+}
+
+
+void thread_pool_destroy(ThreadPool *pool) {
+	thread_pool_join(pool);
+
+	gb_semaphore_destroy(&pool->semaphore);
+	gb_mutex_destroy(&pool->mutex);
+	gb_mutex_destroy(&pool->task_mutex);
+	array_free(&pool->threads);
+	array_free(&pool->tasks);
+}
+
+
+void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
+	gb_mutex_lock(&pool->task_mutex);
+
+	WorkerTask task = {};
+	task.do_work = proc;
+	task.data = data;
+	array_add(&pool->tasks, task);
+
+	gb_mutex_unlock(&pool->task_mutex);
+
+	gb_semaphore_post(&pool->semaphore, 1);
+}
+
+void thread_pool_kick(ThreadPool *pool) {
+	if (pool->tasks.count > 0) {
+		isize count = gb_min(pool->tasks.count, pool->threads.count);
+		for (isize i = 0; i < count; i++) {
+			gb_semaphore_post(&pool->semaphore, 1);
+		}
+	}
+
+}
+void thread_pool_kick_and_wait(ThreadPool *pool) {
+	thread_pool_kick(pool);
+
+	isize return_value = 0;
+	while (pool->tasks.count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) {
+		gb_yield();
+	}
+
+	thread_pool_join(pool);
+}
+
+
+GB_THREAD_PROC(worker_thread_internal) {
+	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
+	thread->return_value = 0;
+	while (pool->is_running) {
+		gb_semaphore_wait(&pool->semaphore);
+
+		WorkerTask task = {};
+		bool got_task = false;
+
+		if (gb_mutex_try_lock(&pool->task_mutex)) {
+			if (pool->tasks.count > 0) {
+				gb_atomic32_fetch_add(&pool->processing_work_count, +1);
+				task = array_pop(&pool->tasks);
+				got_task = true;
+			}
+			gb_mutex_unlock(&pool->task_mutex);
+		}
+
+		if (got_task) {
+			thread->return_value = task.do_work(task.data);
+			gb_atomic32_fetch_add(&pool->processing_work_count, -1);
+		}
+	}
+	return thread->return_value;
+}
+