Browse Source

Clean up thread pool code

gingerBill 6 years ago
parent
commit
772c8779fa
3 changed files with 65 additions and 75 deletions
  1. 3 12
      src/gb/gb.h
  2. 6 5
      src/parser.cpp
  3. 56 58
      src/thread_pool.cpp

+ 3 - 12
src/gb/gb.h

@@ -918,10 +918,7 @@ GB_DEF void gb_lfence      (void);
 
 
 #if defined(GB_SYSTEM_WINDOWS)
-typedef struct gbSemaphore {
-	void *win32_handle;
-	LONG count;
-} gbSemaphore;
+typedef struct gbSemaphore { void *win32_handle;}      gbSemaphore;
 #elif defined(GB_SYSTEM_OSX)
 typedef struct gbSemaphore { semaphore_t osx_handle; } gbSemaphore;
 #elif defined(GB_SYSTEM_UNIX)
@@ -4593,21 +4590,15 @@ gb_inline void gb_semaphore_release(gbSemaphore *s) { gb_semaphore_post(s, 1); }
 #if defined(GB_SYSTEM_WINDOWS)
 	gb_inline void gb_semaphore_init(gbSemaphore *s) {
 		s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL);
-		s->count = 0;
 	}
 	gb_inline void gb_semaphore_destroy(gbSemaphore *s) {
 		CloseHandle(s->win32_handle);
 	}
 	gb_inline void gb_semaphore_post(gbSemaphore *s, i32 count) {
-		_InterlockedIncrement(&s->count);
-		if (ReleaseSemaphore(s->win32_handle, count, NULL) == FALSE) {
-			_InterlockedDecrement(&s->count);
-		}
+		ReleaseSemaphore(s->win32_handle, count, NULL);
 	}
 	gb_inline void gb_semaphore_wait(gbSemaphore *s) {
-		if (WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE) == WAIT_OBJECT_0) {
-			_InterlockedDecrement(&s->count);
-		}
+		WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE);
 	}
 
 #elif defined(GB_SYSTEM_OSX)

+ 6 - 5
src/parser.cpp

@@ -4798,7 +4798,8 @@ ParseFileError parse_packages(Parser *p, String init_filename) {
 	GB_ASSERT(init_filename.text[init_filename.len] == 0);
 
 	isize thread_count = gb_max(build_context.thread_count, 1);
-	thread_pool_init(&parser_thread_pool, heap_allocator(), thread_count, "ParserWork");
+	isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
+	thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork");
 
 	String init_fullpath = path_to_full_path(heap_allocator(), init_filename);
 	if (!path_is_directory(init_fullpath)) {
@@ -4819,12 +4820,12 @@ ParseFileError parse_packages(Parser *p, String init_filename) {
 	p->init_fullpath = init_fullpath;
 
 	thread_pool_start(&parser_thread_pool);
-	thread_pool_kick_and_wait(&parser_thread_pool);
+	thread_pool_wait_to_process(&parser_thread_pool);
 
 	// NOTE(bill): Get the last error and use that
-	for (isize i = parser_thread_pool.thread_count-1; i >= 0; i--) {
-		gbThread *t = &parser_thread_pool.threads[i];
-		ParseFileError err = cast(ParseFileError)t->return_value;
+	for (isize i = parser_thread_pool.task_tail-1; i >= 0; i--) {
+		WorkerTask *task = &parser_thread_pool.tasks[i];
+		ParseFileError err = cast(ParseFileError)task->result;
 		if (err != ParseFile_None) {
 			return err;
 		}

+ 56 - 58
src/thread_pool.cpp

@@ -6,20 +6,21 @@ typedef WORKER_TASK_PROC(WorkerTaskProc);
 struct WorkerTask {
 	WorkerTaskProc *do_work;
 	void *data;
+	isize result;
 };
 
 
 struct ThreadPool {
-	gbMutex     task_mutex;
 	gbMutex     mutex;
-	gbSemaphore semaphore;
+	gbSemaphore sem_available;
 	gbAtomic32  processing_work_count;
 	bool        is_running;
 
 	gbAllocator allocator;
 
 	WorkerTask *tasks;
-	isize volatile task_count;
+	isize volatile task_head;
+	isize volatile task_tail;
 	isize volatile task_capacity;
 
 	gbThread *threads;
@@ -40,14 +41,14 @@ GB_THREAD_PROC(worker_thread_internal);
 
 void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) {
 	pool->allocator = a;
-	pool->task_count = 0;
+	pool->task_head = 0;
+	pool->task_tail = 0;
 	pool->task_capacity = 1024;
 	pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity);
-	pool->threads = gb_alloc_array(a, gbThread, thread_count);
-	pool->thread_count = thread_count;
-	gb_mutex_init(&pool->task_mutex);
+	pool->thread_count = gb_max(thread_count, 0);
+	pool->threads = gb_alloc_array(a, gbThread, pool->thread_count);
 	gb_mutex_init(&pool->mutex);
-	gb_semaphore_init(&pool->semaphore);
+	gb_semaphore_init(&pool->sem_available);
 	pool->is_running = true;
 
 	pool->worker_prefix_len = 0;
@@ -63,6 +64,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count
 		gb_thread_init(t);
 		t->user_index = i;
 		#if 0
+		// TODO(bill): Fix this on Linux as it causes a seg-fault
 		if (pool->worker_prefix_len > 0) {
 			char worker_name[16] = {};
 			gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i);
@@ -82,9 +84,9 @@ void thread_pool_start(ThreadPool *pool) {
 void thread_pool_join(ThreadPool *pool) {
 	pool->is_running = false;
 
-	for (isize i = 0; i < pool->thread_count; i++) {
-		gb_semaphore_release(&pool->semaphore);
-	}
+	gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count);
+
+	gb_yield();
 
 	for (isize i = 0; i < pool->thread_count; i++) {
 		gbThread *t = &pool->threads[i];
@@ -96,25 +98,24 @@ void thread_pool_join(ThreadPool *pool) {
 void thread_pool_destroy(ThreadPool *pool) {
 	thread_pool_join(pool);
 
-	gb_semaphore_destroy(&pool->semaphore);
+	gb_semaphore_destroy(&pool->sem_available);
 	gb_mutex_destroy(&pool->mutex);
-	gb_mutex_destroy(&pool->task_mutex);
 	gb_free(pool->allocator, pool->threads);
 	pool->thread_count = 0;
 	gb_free(pool->allocator, pool->tasks);
-	pool->task_count = 0;
+	pool->task_head = 0;
+	pool->task_tail = 0;
 	pool->task_capacity = 0;
-
 }
 
 
 void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
-	gb_mutex_lock(&pool->task_mutex);
+	gb_mutex_lock(&pool->mutex);
 
-	if (pool->task_count == pool->task_capacity) {
+	if (pool->task_tail == pool->task_capacity) {
 		isize new_cap = 2*pool->task_capacity + 8;
 		WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap);
-		gb_memmove(new_tasks, pool->tasks, pool->task_count*gb_size_of(WorkerTask));
+		gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask));
 		pool->tasks = new_tasks;
 		pool->task_capacity = new_cap;
 	}
@@ -122,35 +123,42 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
 	task.do_work = proc;
 	task.data = data;
 
-	pool->tasks[pool->task_count++] = task;
-
-	gb_semaphore_post(&pool->semaphore, 1);
-	gb_mutex_unlock(&pool->task_mutex);
+	pool->tasks[pool->task_tail++] = task;
+	gb_semaphore_post(&pool->sem_available, 1);
+	gb_mutex_unlock(&pool->mutex);
 }
 
-void thread_pool_kick(ThreadPool *pool) {
-	gb_mutex_lock(&pool->task_mutex);
-	if (pool->task_count > 0) {
-		isize count = gb_min(pool->task_count, pool->thread_count);
-		for (isize i = 0; i < count; i++) {
-			gb_semaphore_post(&pool->semaphore, 1);
+bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) {
+	bool got_task = false;
+	if (gb_mutex_try_lock(&pool->mutex)) {
+		if (pool->task_tail > pool->task_head) {
+			gb_atomic32_fetch_add(&pool->processing_work_count, +1);
+			*task = pool->tasks[pool->task_head++];
+			got_task = true;
 		}
+		gb_mutex_unlock(&pool->mutex);
 	}
-	gb_mutex_unlock(&pool->task_mutex);
+	return got_task;
+}
+void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) {
+	task->result = task->do_work(task->data);
+	gb_atomic32_fetch_add(&pool->processing_work_count, -1);
 }
-void thread_pool_kick_and_wait(ThreadPool *pool) {
-	thread_pool_kick(pool);
-
-	isize return_value = 0;
-	while (pool->task_count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) {
-
-		if (pool->task_count > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) {
-			gb_mutex_lock(&pool->task_mutex);
-			for (isize i = 0; i < pool->task_count; i++) {
-				gb_semaphore_post(&pool->semaphore, 1);
-			}
-			gb_mutex_unlock(&pool->task_mutex);
+
+void thread_pool_wait_to_process(ThreadPool *pool) {
+	while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) {
+		WorkerTask task = {};
+		if (thread_pool_try_and_pop_task(pool, &task)) {
+			thread_pool_do_work(pool, &task);
+		}
+
+		// Safety-kick
+		if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) {
+			gb_mutex_lock(&pool->mutex);
+			gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head));
+			gb_mutex_unlock(&pool->mutex);
 		}
+
 		gb_yield();
 	}
 
@@ -160,27 +168,17 @@ void thread_pool_kick_and_wait(ThreadPool *pool) {
 
 GB_THREAD_PROC(worker_thread_internal) {
 	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
-	thread->return_value = 0;
 	while (pool->is_running) {
-		gb_semaphore_wait(&pool->semaphore);
+		gb_semaphore_wait(&pool->sem_available);
 
 		WorkerTask task = {};
-		bool got_task = false;
-
-		if (gb_mutex_try_lock(&pool->task_mutex)) {
-			if (pool->task_count > 0) {
-				gb_atomic32_fetch_add(&pool->processing_work_count, +1);
-				task = pool->tasks[--pool->task_count];
-				got_task = true;
-			}
-			gb_mutex_unlock(&pool->task_mutex);
-		}
-
-		if (got_task) {
-			thread->return_value = task.do_work(task.data);
-			gb_atomic32_fetch_add(&pool->processing_work_count, -1);
+		if (thread_pool_try_and_pop_task(pool, &task)) {
+			thread_pool_do_work(pool, &task);
 		}
 	}
-	return thread->return_value;
+	// Cascade
+	gb_semaphore_release(&pool->sem_available);
+
+	return 0;
 }