Browse Source

Unify thread pool logic across the rest of the compiler, using a global thread pool

gingerBill 4 years ago
parent
commit
ad3a3547d6
9 changed files with 189 additions and 142 deletions
  1. 45 72
      src/checker.cpp
  2. 1 3
      src/common.cpp
  3. 2 5
      src/llvm_backend.cpp
  4. 0 2
      src/llvm_backend_general.cpp
  5. 17 0
      src/main.cpp
  6. 37 32
      src/parser.cpp
  7. 1 3
      src/parser.hpp
  8. 46 25
      src/thread_pool.cpp
  9. 40 0
      src/threading.cpp

+ 45 - 72
src/checker.cpp

@@ -4140,7 +4140,7 @@ struct ThreadProcCheckerSection {
 };
 
 
-void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) {
+void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) {
 	isize thread_count = gb_max(build_context.thread_count, 1);
 	isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
 	if (!build_context.threaded_checker) {
@@ -4173,33 +4173,18 @@ void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) {
 	}
 	GB_ASSERT(remaining_count <= 0);
 
-	Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count);
-	for (isize i = 0; i < worker_count; i++) {
-		thread_init(threads+i);
-	}
-
-	for (isize i = 0; i < worker_count; i++) {
-		thread_start(threads+i, proc, thread_data+i);
+	
+	for (isize i = 0; i < thread_count; i++) {
+		global_thread_pool_add_task(proc, thread_data+i);
 	}
-	Thread dummy_main_thread = {};
-	dummy_main_thread.user_data = thread_data+worker_count;
-	proc(&dummy_main_thread);
-
+	global_thread_pool_wait();
 	semaphore_wait(&c->info.collect_semaphore);
-
-	for (isize i = 0; i < worker_count; i++) {
-		thread_join(threads+i);
-	}
-
-	for (isize i = 0; i < worker_count; i++) {
-		thread_destroy(threads+i);
-	}
 }
 
 
-THREAD_PROC(thread_proc_collect_entities) {
-	auto *data = cast(ThreadProcCheckerSection *)thread->user_data;
-	Checker *c = data->checker;
+WORKER_TASK_PROC(thread_proc_collect_entities) {
+	auto *cs = cast(ThreadProcCheckerSection *)data;
+	Checker *c = cs->checker;
 	CheckerContext collect_entity_ctx = make_checker_context(c);
 	defer (destroy_checker_context(&collect_entity_ctx));
 
@@ -4208,8 +4193,8 @@ THREAD_PROC(thread_proc_collect_entities) {
 	UntypedExprInfoMap untyped = {};
 	map_init(&untyped, heap_allocator());
 
-	isize offset = data->offset;
-	isize file_end = gb_min(offset+data->count, c->info.files.entries.count);
+	isize offset = cs->offset;
+	isize file_end = gb_min(offset+cs->count, c->info.files.entries.count);
 
 	for (isize i = offset; i < file_end; i++) {
 		AstFile *f = c->info.files.entries[i].value;
@@ -4246,9 +4231,9 @@ void check_export_entities_in_pkg(CheckerContext *ctx, AstPackage *pkg, UntypedE
 	}
 }
 
-THREAD_PROC(thread_proc_check_export_entities) {
-	auto data = cast(ThreadProcCheckerSection *)thread->user_data;
-	Checker *c = data->checker;
+WORKER_TASK_PROC(thread_proc_check_export_entities) {
+	auto cs = cast(ThreadProcCheckerSection *)data;
+	Checker *c = cs->checker;
 
 	CheckerContext ctx = make_checker_context(c);
 	defer (destroy_checker_context(&ctx));
@@ -4256,8 +4241,8 @@ THREAD_PROC(thread_proc_check_export_entities) {
 	UntypedExprInfoMap untyped = {};
 	map_init(&untyped, heap_allocator());
 
-	isize end = gb_min(data->offset + data->count, c->info.packages.entries.count);
-	for (isize i = data->offset; i < end; i++) {
+	isize end = gb_min(cs->offset + cs->count, c->info.packages.entries.count);
+	for (isize i = cs->offset; i < end; i++) {
 		AstPackage *pkg = c->info.packages.entries[i].value;
 		check_export_entities_in_pkg(&ctx, pkg, &untyped);
 	}
@@ -4575,15 +4560,19 @@ void calculate_global_init_order(Checker *c) {
 }
 
 
-void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) {
+bool check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) {
 	if (pi == nullptr) {
-		return;
+		return false;
 	}
 	if (pi->type == nullptr) {
-		return;
+		return false;
 	}
+	Entity *e = pi->decl->entity;
 	if (pi->decl->proc_checked) {
-		return;
+		if (e != nullptr) {
+			GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
+		}
+		return true;
 	}
 
 	CheckerContext ctx = make_checker_context(c);
@@ -4601,14 +4590,13 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc
 			token = ast_token(pi->poly_def_node);
 		}
 		error(token, "Unspecialized polymorphic procedure '%.*s'", LIT(name));
-		return;
+		return false;
 	}
 
 	if (pt->is_polymorphic && pt->is_poly_specialized) {
-		Entity *e = pi->decl->entity;
 		if ((e->flags & EntityFlag_Used) == 0) {
 			// NOTE(bill, 2019-08-31): It was never used, don't check
-			return;
+			return false;
 		}
 	}
 
@@ -4622,16 +4610,17 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc
 		ctx.state_flags |= StateFlag_no_bounds_check;
 		ctx.state_flags &= ~StateFlag_bounds_check;
 	}
-	if (pi->body != nullptr && pi->decl->entity != nullptr) {
-		GB_ASSERT((pi->decl->entity->flags & EntityFlag_ProcBodyChecked) == 0);
+	if (pi->body != nullptr && e != nullptr) {
+		GB_ASSERT((e->flags & EntityFlag_ProcBodyChecked) == 0);
 	}
 
 	check_proc_body(&ctx, pi->token, pi->decl, pi->type, pi->body);
-	if (pi->body != nullptr && pi->decl->entity != nullptr) {
-		pi->decl->entity->flags |= EntityFlag_ProcBodyChecked;
+	if (e != nullptr) {
+		e->flags |= EntityFlag_ProcBodyChecked;
 	}
 	pi->decl->proc_checked = true;
 	add_untyped_expressions(&c->info, ctx.untyped);
+	return true;
 }
 
 GB_STATIC_ASSERT(sizeof(isize) == sizeof(void *));
@@ -4681,9 +4670,10 @@ void check_unchecked_bodies(Checker *c) {
 	ProcInfo *pi = nullptr;
 	while (mpmc_dequeue(q, &pi)) {
 		Entity *e = pi->decl->entity;
-		consume_proc_info_queue(c, pi, q, &untyped);
-		add_dependency_to_set(c, e);
-		GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
+		if (consume_proc_info_queue(c, pi, q, &untyped)) {
+			add_dependency_to_set(c, e);
+			GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
+		}
 	}
 
 }
@@ -4728,15 +4718,15 @@ bool consume_proc_info_queue(Checker *c, ProcInfo *pi, ProcBodyQueue *q, Untyped
 		// NOTE(bill): In single threaded mode, this should never happen
 		if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) {
 			mpmc_enqueue(q, pi);
-			return true;
+			return false;
 		}
 	}
 	if (untyped) {
 		map_clear(untyped);
 	}
-	check_proc_info(c, pi, untyped, q);
+	bool ok = check_proc_info(c, pi, untyped, q);
 	total_bodies_checked.fetch_add(1, std::memory_order_relaxed);
-	return false;
+	return ok;
 }
 
 struct ThreadProcBodyData {
@@ -4747,11 +4737,11 @@ struct ThreadProcBodyData {
 	ThreadProcBodyData *all_data;
 };
 
-THREAD_PROC(thread_proc_body) {
-	ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data;
-	Checker *c = data->checker;
+WORKER_TASK_PROC(thread_proc_body) {
+	ThreadProcBodyData *bd = cast(ThreadProcBodyData *)data;
+	Checker *c = bd->checker;
 	GB_ASSERT(c != nullptr);
-	ProcBodyQueue *this_queue = data->queue;
+	ProcBodyQueue *this_queue = bd->queue;
 
 	UntypedExprInfoMap untyped = {};
 	map_init(&untyped, heap_allocator());
@@ -4821,31 +4811,14 @@ void check_procedure_bodies(Checker *c) {
 	}
 	GB_ASSERT(total_queued == original_queue_count);
 
-
 	semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count);
-
-	Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count);
-	for (isize i = 0; i < worker_count; i++) {
-		thread_init(threads+i);
-	}
-
-	for (isize i = 0; i < worker_count; i++) {
-		thread_start(threads+i, thread_proc_body, thread_data+i);
+	
+	for (isize i = 0; i < thread_count; i++) {
+		global_thread_pool_add_task(thread_proc_body, thread_data+i);
 	}
-	Thread dummy_main_thread = {};
-	dummy_main_thread.user_data = thread_data+worker_count;
-	thread_proc_body(&dummy_main_thread);
-
+	global_thread_pool_wait();
 	semaphore_wait(&c->procs_to_check_semaphore);
 
-	for (isize i = 0; i < worker_count; i++) {
-		thread_join(threads+i);
-	}
-
-	for (isize i = 0; i < worker_count; i++) {
-		thread_destroy(threads+i);
-	}
-
 	isize global_remaining = c->procs_to_check_queue.count.load(std::memory_order_relaxed);
 	GB_ASSERT(global_remaining == 0);
 

+ 1 - 3
src/common.cpp

@@ -44,11 +44,9 @@ void debugf(char const *fmt, ...);
 #include "queue.cpp"
 #include "common_memory.cpp"
 #include "string.cpp"
-
-
-
 #include "range_cache.cpp"
 
+
 u32 fnv32a(void const *data, isize len) {
 	u8 const *bytes = cast(u8 const *)data;
 	u32 h = 0x811c9dc5;

+ 2 - 5
src/llvm_backend.cpp

@@ -1104,9 +1104,6 @@ void lb_generate_code(lbGenerator *gen) {
 
 	LLVMBool do_threading = (LLVMIsMultithreaded() && USE_SEPARATE_MODULES && MULTITHREAD_OBJECT_GENERATION && worker_count > 0);
 
-	thread_pool_init(&lb_thread_pool, heap_allocator(), worker_count, "LLVMBackend");
-	defer (thread_pool_destroy(&lb_thread_pool));
-
 	lbModule *default_module = &gen->default_module;
 	CheckerInfo *info = gen->info;
 
@@ -1691,10 +1688,10 @@ void lb_generate_code(lbGenerator *gen) {
 			wd->code_gen_file_type = code_gen_file_type;
 			wd->filepath_obj = filepath_obj;
 			wd->m = m;
-			thread_pool_add_task(&lb_thread_pool, lb_llvm_emit_worker_proc, wd);
+			global_thread_pool_add_task(lb_llvm_emit_worker_proc, wd);
 		}
 
-		thread_pool_wait(&lb_thread_pool);
+		thread_pool_wait(&global_thread_pool);
 	} else {
 		for_array(j, gen->modules.entries) {
 			lbModule *m = gen->modules.entries[j].value;

+ 0 - 2
src/llvm_backend_general.cpp

@@ -1,7 +1,5 @@
 void lb_add_debug_local_variable(lbProcedure *p, LLVMValueRef ptr, Type *type, Token const &token);
 
-gb_global ThreadPool lb_thread_pool = {};
-
 gb_global Entity *lb_global_type_info_data_entity   = {};
 gb_global lbAddr lb_global_type_info_member_types   = {};
 gb_global lbAddr lb_global_type_info_member_names   = {};

+ 17 - 0
src/main.cpp

@@ -7,6 +7,20 @@
 #include "exact_value.cpp"
 #include "build_settings.cpp"
 
+gb_global ThreadPool global_thread_pool;
+void init_global_thread_pool(void) {
+	isize thread_count = gb_max(build_context.thread_count, 1);
+	isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
+	thread_pool_init(&global_thread_pool, permanent_allocator(), worker_count, "ThreadPoolWorker");
+}
+bool global_thread_pool_add_task(WorkerTaskProc *proc, void *data) {
+	return thread_pool_add_task(&global_thread_pool, proc, data);
+}
+void global_thread_pool_wait(void) {
+	thread_pool_wait(&global_thread_pool);
+}
+
+
 void debugf(char const *fmt, ...) {
 	if (build_context.show_debug_messages) {
 		gb_printf_err("[DEBUG] ");
@@ -2160,6 +2174,9 @@ int main(int arg_count, char const **arg_ptr) {
 	// 	return 1;
 	// }
 
+	init_global_thread_pool();
+	defer (thread_pool_destroy(&global_thread_pool));
+	
 	init_universal();
 	// TODO(bill): prevent compiling without a linker
 

+ 37 - 32
src/parser.cpp

@@ -4810,6 +4810,7 @@ bool init_parser(Parser *p) {
 	string_set_init(&p->imported_files, heap_allocator());
 	array_init(&p->packages, heap_allocator());
 	array_init(&p->package_imports, heap_allocator());
+	mutex_init(&p->wait_mutex);
 	mutex_init(&p->import_mutex);
 	mutex_init(&p->file_add_mutex);
 	mutex_init(&p->file_decl_mutex);
@@ -4837,6 +4838,7 @@ void destroy_parser(Parser *p) {
 	array_free(&p->packages);
 	array_free(&p->package_imports);
 	string_set_destroy(&p->imported_files);
+	mutex_destroy(&p->wait_mutex);
 	mutex_destroy(&p->import_mutex);
 	mutex_destroy(&p->file_add_mutex);
 	mutex_destroy(&p->file_decl_mutex);
@@ -4870,7 +4872,7 @@ void parser_add_file_to_process(Parser *p, AstPackage *pkg, FileInfo fi, TokenPo
 	auto wd = gb_alloc_item(heap_allocator(), ParserWorkerData);
 	wd->parser = p;
 	wd->imported_file = f;
-	thread_pool_add_task(&parser_thread_pool, parser_worker_proc, wd);
+	global_thread_pool_add_task(parser_worker_proc, wd);
 }
 
 WORKER_TASK_PROC(foreign_file_worker_proc) {
@@ -4909,7 +4911,7 @@ void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, AstForeignFi
 	wd->parser = p;
 	wd->imported_file = f;
 	wd->foreign_kind = kind;
-	thread_pool_add_task(&parser_thread_pool, foreign_file_worker_proc, wd);
+	global_thread_pool_add_task(foreign_file_worker_proc, wd);
 }
 
 
@@ -5619,10 +5621,6 @@ ParseFileError process_imported_file(Parser *p, ImportedFile imported_file) {
 ParseFileError parse_packages(Parser *p, String init_filename) {
 	GB_ASSERT(init_filename.text[init_filename.len] == 0);
 
-	isize thread_count = gb_max(build_context.thread_count, 1);
-	isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
-	thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork");
-
 	String init_fullpath = path_to_full_path(heap_allocator(), init_filename);
 	if (!path_is_directory(init_fullpath)) {
 		String const ext = str_lit(".odin");
@@ -5631,38 +5629,45 @@ ParseFileError parse_packages(Parser *p, String init_filename) {
 			return ParseFile_WrongExtension;
 		}
 	}
+	
 
-	TokenPos init_pos = {};
-	{
-		String s = get_fullpath_core(heap_allocator(), str_lit("runtime"));
-		try_add_import_path(p, s, s, init_pos, Package_Runtime);
-	}
+	{ // Add these packages serially and then process them parallel
+		mutex_lock(&p->wait_mutex);
+		defer (mutex_unlock(&p->wait_mutex));
+		
+		TokenPos init_pos = {};
+		{
+			String s = get_fullpath_core(heap_allocator(), str_lit("runtime"));
+			try_add_import_path(p, s, s, init_pos, Package_Runtime);
+		}
 
-	try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init);
-	p->init_fullpath = init_fullpath;
+		try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init);
+		p->init_fullpath = init_fullpath;
 
-	if (build_context.command_kind == Command_test) {
-		String s = get_fullpath_core(heap_allocator(), str_lit("testing"));
-		try_add_import_path(p, s, s, init_pos, Package_Normal);
-	}
-
-	for_array(i, build_context.extra_packages) {
-		String path = build_context.extra_packages[i];
-		String fullpath = path_to_full_path(heap_allocator(), path); // LEAK?
-		if (!path_is_directory(fullpath)) {
-			String const ext = str_lit(".odin");
-			if (!string_ends_with(fullpath, ext)) {
-				error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath));
-				return ParseFile_WrongExtension;
-			}
+		if (build_context.command_kind == Command_test) {
+			String s = get_fullpath_core(heap_allocator(), str_lit("testing"));
+			try_add_import_path(p, s, s, init_pos, Package_Normal);
 		}
-		AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal);
-		if (pkg) {
-			pkg->is_extra = true;
+		
+
+		for_array(i, build_context.extra_packages) {
+			String path = build_context.extra_packages[i];
+			String fullpath = path_to_full_path(heap_allocator(), path); // LEAK?
+			if (!path_is_directory(fullpath)) {
+				String const ext = str_lit(".odin");
+				if (!string_ends_with(fullpath, ext)) {
+					error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath));
+					return ParseFile_WrongExtension;
+				}
+			}
+			AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal);
+			if (pkg) {
+				pkg->is_extra = true;
+			}
 		}
 	}
-
-	thread_pool_wait(&parser_thread_pool);
+	
+	global_thread_pool_wait();
 
 	for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) {
 		if (err != ParseFile_None) {

+ 1 - 3
src/parser.hpp

@@ -191,6 +191,7 @@ struct Parser {
 	isize                     file_to_process_count;
 	isize                     total_token_count;
 	isize                     total_line_count;
+	BlockingMutex             wait_mutex;
 	BlockingMutex             import_mutex;
 	BlockingMutex             file_add_mutex;
 	BlockingMutex             file_decl_mutex;
@@ -198,9 +199,6 @@ struct Parser {
 	MPMCQueue<ParseFileError> file_error_queue;
 };
 
-
-gb_global ThreadPool parser_thread_pool = {};
-
 struct ParserWorkerData {
 	Parser *parser;
 	ImportedFile imported_file;

+ 46 - 25
src/thread_pool.cpp

@@ -19,10 +19,15 @@ struct ThreadPool {
 	WorkerTask *task_queue;
 	
 	std::atomic<isize> ready;
+	std::atomic<bool>  stop;
+	
 };
 
+THREAD_PROC(thread_pool_thread_proc);
+
 void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) {
 	pool->allocator = a;
+	pool->stop = false;
 	mutex_init(&pool->mutex);
 	condition_init(&pool->task_cond);
 	
@@ -31,9 +36,15 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count
 		Thread *t = &pool->threads[i];
 		thread_init(t);
 	}
+	
+	for_array(i, pool->threads) {
+		Thread *t = &pool->threads[i];
+		thread_start(t, thread_pool_thread_proc, pool);
+	}
 }
 
 void thread_pool_destroy(ThreadPool *pool) {
+	pool->stop = true;
 	condition_broadcast(&pool->task_cond);
 
 	for_array(i, pool->threads) {
@@ -41,20 +52,20 @@ void thread_pool_destroy(ThreadPool *pool) {
 		thread_join(t);
 	}
 	
-	
 	for_array(i, pool->threads) {
 		Thread *t = &pool->threads[i];
 		thread_destroy(t);
 	}
 	
 	gb_free(pool->allocator, pool->threads.data);
-	condition_destroy(&pool->task_cond);
 	mutex_destroy(&pool->mutex);
+	condition_destroy(&pool->task_cond);
 }
 
 bool thread_pool_queue_empty(ThreadPool *pool) {
 	return pool->task_queue == nullptr;
 }
+
 WorkerTask *thread_pool_queue_pop(ThreadPool *pool) {
 	GB_ASSERT(pool->task_queue != nullptr);
 	WorkerTask *task = pool->task_queue;
@@ -80,30 +91,34 @@ bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
 	task->data = data;
 		
 	thread_pool_queue_push(pool, task);
+	GB_ASSERT(pool->ready >= 0);
 	pool->ready++;
 	mutex_unlock(&pool->mutex);
 	condition_signal(&pool->task_cond);
 	return true;
 }	
 
-THREAD_PROC(thread_pool_thread_proc) {
-	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
-	
+
+void thread_pool_do_task(WorkerTask *task) {
+	task->do_work(task->data);
+}
+
+void thread_pool_wait(ThreadPool *pool) {
 	for (;;) {
 		mutex_lock(&pool->mutex);
-		
-		while (pool->ready > 0 && thread_pool_queue_empty(pool)) {
+
+		while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) {
 			condition_wait(&pool->task_cond, &pool->mutex);
 		}
-		if (pool->ready == 0 && thread_pool_queue_empty(pool)) {
+		if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) {
 			mutex_unlock(&pool->mutex);
-			return 0;
+			return;
 		}
-		
+
 		WorkerTask *task = thread_pool_queue_pop(pool);
 		mutex_unlock(&pool->mutex);
 	
-		task->do_work(task->data);
+		thread_pool_do_task(task);
 		if (--pool->ready == 0) {
 			condition_broadcast(&pool->task_cond);
 		}
@@ -111,20 +126,26 @@ THREAD_PROC(thread_pool_thread_proc) {
 }
 
 
-void thread_pool_wait(ThreadPool *pool) {
-	for_array(i, pool->threads) {
-		Thread *t = &pool->threads[i];
-		thread_start(t, thread_pool_thread_proc, pool);
-	}
+THREAD_PROC(thread_pool_thread_proc) {
+	ThreadPool *pool = cast(ThreadPool *)thread->user_data;
 	
-	Thread dummy = {};
-	dummy.proc = thread_pool_thread_proc;
-	dummy.user_data = pool;
-	thread_pool_thread_proc(&dummy);
+	for (;;) {
+		mutex_lock(&pool->mutex);
+
+		while (!pool->stop && thread_pool_queue_empty(pool)) {
+			condition_wait(&pool->task_cond, &pool->mutex);
+		}
+		if (pool->stop && thread_pool_queue_empty(pool)) {
+			mutex_unlock(&pool->mutex);
+			return 0;
+		}
+
+		WorkerTask *task = thread_pool_queue_pop(pool);
+		mutex_unlock(&pool->mutex);
 	
-	for_array(i, pool->threads) {
-		Thread *t = &pool->threads[i];
-		thread_join(t);
+		thread_pool_do_task(task);
+		if (--pool->ready == 0) {
+			condition_broadcast(&pool->task_cond);
+		}
 	}
-}
-
+}

+ 40 - 0
src/threading.cpp

@@ -231,6 +231,46 @@ void yield_process(void);
 		
 	}
 #endif
+	
+	
+struct Barrier {
+	BlockingMutex mutex;
+	Condition cond;
+	isize index;
+	isize generation_id;
+	isize thread_count;	
+};
+
+void barrier_init(Barrier *b, isize thread_count) {
+	mutex_init(&b->mutex);
+	condition_init(&b->cond);
+	b->index = 0;
+	b->generation_id = 0;
+	b->thread_count = 0;
+}
+
+void barrier_destroy(Barrier *b) {
+	condition_destroy(&b->cond);
+	mutex_destroy(&b->mutex);
+}
+
+// Returns true if it is the leader
+bool barrier_wait(Barrier *b) {
+	mutex_lock(&b->mutex);
+	defer (mutex_unlock(&b->mutex));
+	isize local_gen = b->generation_id;
+	b->index += 1;
+	if (b->index < b->thread_count) {
+		while (local_gen == b->generation_id && b->index < b->thread_count) {
+			condition_wait(&b->cond, &b->mutex);
+		}
+		return false;
+	}
+	b->index = 0;
+	b->generation_id += 1;
+	condition_broadcast(&b->cond);
+	return true;
+}