Browse Source

Big improvement to the `-threaded-checker` code, unifying the logic and simplify behaviour

gingerBill 4 years ago
parent
commit
cec2309504
4 changed files with 108 additions and 46 deletions
  1. 1 2
      src/check_decl.cpp
  2. 2 2
      src/check_expr.cpp
  3. 101 41
      src/checker.cpp
  4. 4 1
      src/checker.hpp

+ 1 - 2
src/check_decl.cpp

@@ -786,8 +786,7 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) {
 
 		GB_ASSERT(pl->body->kind == Ast_BlockStmt);
 		if (!pt->is_polymorphic) {
-			// check_procedure_now(ctx->checker, ctx->file, e->token, d, proc_type, pl->body, pl->tags);
-			check_procedure_later(ctx->checker, ctx->file, e->token, d, proc_type, pl->body, pl->tags);
+			check_procedure_later(ctx, ctx->file, e->token, d, proc_type, pl->body, pl->tags);
 		}
 	} else if (!is_foreign) {
 		if (e->Procedure.is_export) {

+ 2 - 2
src/check_expr.cpp

@@ -443,7 +443,7 @@ bool find_or_generate_polymorphic_procedure(CheckerContext *c, Entity *base_enti
 	}
 
 	// NOTE(bill): Check the newly generated procedure body
-	check_procedure_later(nctx.checker, proc_info);
+	check_procedure_later(&nctx, proc_info);
 
 	return true;
 }
@@ -6497,7 +6497,7 @@ ExprKind check_expr_base_internal(CheckerContext *c, Operand *o, Ast *node, Type
 			}
 
 			pl->decl = decl;
-			check_procedure_later(ctx.checker, ctx.file, empty_token, decl, type, pl->body, pl->tags);
+			check_procedure_later(&ctx, ctx.file, empty_token, decl, type, pl->body, pl->tags);
 		}
 		check_close_scope(&ctx);
 

+ 101 - 41
src/checker.cpp

@@ -925,8 +925,10 @@ void reset_checker_context(CheckerContext *ctx, AstFile *file) {
 		return;
 	}
 	destroy_checker_context(ctx);
+	auto *queue = ctx->procs_to_check_queue;
 	*ctx = make_checker_context(ctx->checker);
 	add_curr_ast_file(ctx, file);
+	ctx->procs_to_check_queue = queue;
 }
 
 
@@ -966,8 +968,8 @@ void destroy_checker(Checker *c) {
 
 	destroy_checker_context(&c->builtin_ctx);
 
-	// mpmc_destroy(&c->procs_to_check_queue);
-	// gb_semaphore_destroy(&c->procs_to_check_semaphore);
+	mpmc_destroy(&c->procs_to_check_queue);
+	gb_semaphore_destroy(&c->procs_to_check_semaphore);
 }
 
 
@@ -1519,15 +1521,21 @@ void add_type_info_type(CheckerContext *c, Type *t) {
 	}
 }
 
-void check_procedure_later(Checker *c, ProcInfo *info) {
+gb_global bool global_procedure_body_in_worker_queue = false;
+
+void check_procedure_later(CheckerContext *c, ProcInfo *info) {
 	GB_ASSERT(info != nullptr);
 	GB_ASSERT(info->decl != nullptr);
 
-	mpmc_enqueue(&c->procs_to_check_queue, info);
-	gb_semaphore_post(&c->procs_to_check_semaphore, 1);
+	if (build_context.threaded_checker && global_procedure_body_in_worker_queue) {
+		GB_ASSERT(c->procs_to_check_queue != nullptr);
+	}
+
+	auto *queue = c->procs_to_check_queue ? c->procs_to_check_queue : &c->checker->procs_to_check_queue;
+	mpmc_enqueue(queue, info);
 }
 
-void check_procedure_later(Checker *c, AstFile *file, Token token, DeclInfo *decl, Type *type, Ast *body, u64 tags) {
+void check_procedure_later(CheckerContext *c, AstFile *file, Token token, DeclInfo *decl, Type *type, Ast *body, u64 tags) {
 	ProcInfo *info = gb_alloc_item(permanent_allocator(), ProcInfo);
 	info->file  = file;
 	info->token = token;
@@ -4274,7 +4282,7 @@ void calculate_global_init_order(Checker *c) {
 }
 
 
-void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped) {
+void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) {
 	if (pi == nullptr) {
 		return;
 	}
@@ -4290,6 +4298,7 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped) {
 	reset_checker_context(&ctx, pi->file);
 	ctx.decl = pi->decl;
 	ctx.untyped = untyped;
+	ctx.procs_to_check_queue = procs_to_check_queue;
 
 	TypeProc *pt = &pi->type->Proc;
 	String name = pi->token.string;
@@ -4370,7 +4379,7 @@ void check_unchecked_bodies(Checker *c) {
 			}
 
 			map_clear(&untyped);
-			check_proc_info(c, &pi, &untyped);
+			check_proc_info(c, &pi, &untyped, nullptr);
 		}
 	}
 }
@@ -4407,45 +4416,22 @@ void check_test_names(Checker *c) {
 
 }
 
-static bool proc_bodies_is_running;
+struct ThreadProcBodyData {
+	Checker *checker;
+	ProcBodyQueue *queue;
+};
 
-GB_THREAD_PROC(thread_proc_body) {
-	Checker *c = cast(Checker *)thread->user_data;
-	auto *q = &c->procs_to_check_queue;
-	ProcInfo *pi = nullptr;
+gb_global std::atomic<isize> total_bodies_checked;
 
+GB_THREAD_PROC(thread_proc_body) {
+	ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data;
+	Checker *c = data->checker;
+	ProcBodyQueue *q = data->queue;
 	UntypedExprInfoMap untyped = {};
 	map_init(&untyped, heap_allocator());
 	defer (map_destroy(&untyped));
 
-	while (proc_bodies_is_running) {
-		gb_semaphore_wait(&c->procs_to_check_semaphore);
-
-		if (mpmc_dequeue(q, &pi)) {
-			if (pi->decl->parent && pi->decl->parent->entity) {
-				Entity *parent = pi->decl->parent->entity;
-				// NOTE(bill): Only check a nested procedure if its parent's body has been checked first
-				// This is prevent any possible race conditions in evaluation when multithreaded
-				// NOTE(bill): In single threaded mode, this should never happen
-				if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) {
-					mpmc_enqueue(q, pi);
-					continue;
-				}
-			}
-			map_clear(&untyped);
-			check_proc_info(c, pi, &untyped);
-		}
-	}
-
-	gb_semaphore_release(&c->procs_to_check_semaphore);
-
-	return 0;
-}
-
-void check_procedure_bodies(Checker *c) {
-	auto *q = &c->procs_to_check_queue;
 	ProcInfo *pi = nullptr;
-
 	while (mpmc_dequeue(q, &pi)) {
 		GB_ASSERT(pi->decl != nullptr);
 		if (pi->decl->parent && pi->decl->parent->entity) {
@@ -4458,10 +4444,84 @@ void check_procedure_bodies(Checker *c) {
 				continue;
 			}
 		}
-		check_proc_info(c, pi, nullptr);
+		map_clear(&untyped);
+		check_proc_info(c, pi, &untyped, q);
+		total_bodies_checked.fetch_add(1, std::memory_order_relaxed);
 	}
+
+	gb_semaphore_release(&c->procs_to_check_semaphore);
+
+	return 0;
 }
 
+void check_procedure_bodies(Checker *c) {
+
+	isize thread_count = gb_max(build_context.thread_count, 1);
+	isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
+	if (!build_context.threaded_checker) {
+		worker_count = 0;
+	}
+
+	global_procedure_body_in_worker_queue = true;
+
+	isize original_queue_count = c->procs_to_check_queue.count.load(std::memory_order_relaxed);
+	isize load_count = (original_queue_count+thread_count-1)/thread_count;
+
+	ThreadProcBodyData *thread_data = gb_alloc_array(permanent_allocator(), ThreadProcBodyData, thread_count);
+	for (isize i = 0; i < thread_count; i++) {
+		ThreadProcBodyData *data = thread_data + i;
+		data->checker = c;
+		data->queue = gb_alloc_item(permanent_allocator(), ProcBodyQueue);
+		mpmc_init(data->queue, heap_allocator(), next_pow2_isize(load_count*2));
+	}
+
+	for (isize i = 0; i < thread_count; i++) {
+		ProcBodyQueue *queue = thread_data[i].queue;
+		for (isize j = 0; j < load_count; j++) {
+			ProcInfo *pi = nullptr;
+			if (!mpmc_dequeue(&c->procs_to_check_queue, &pi)) {
+				break;
+			}
+			mpmc_enqueue(queue, pi);
+		}
+	}
+	isize total_queued = 0;
+	for (isize i = 0; i < thread_count; i++) {
+		ProcBodyQueue *queue = thread_data[i].queue;
+		total_queued += queue->count.load();
+	}
+	GB_ASSERT(total_queued == original_queue_count);
+
+
+	gb_semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count);
+
+	gbThread *threads = gb_alloc_array(permanent_allocator(), gbThread, worker_count);
+	for (isize i = 0; i < worker_count; i++) {
+		gb_thread_init(threads+i);
+	}
+
+	for (isize i = 0; i < worker_count; i++) {
+		gb_thread_start(threads+i, thread_proc_body, thread_data+i);
+	}
+	gbThread dummy_main_thread = {};
+	dummy_main_thread.user_data = thread_data+worker_count;
+	thread_proc_body(&dummy_main_thread);
+	gb_semaphore_release(&c->procs_to_check_semaphore);
+
+	for (isize i = 0; i < worker_count; i++) {
+		gb_thread_join(threads+i);
+	}
+
+	gb_semaphore_wait(&c->procs_to_check_semaphore);
+
+	for (isize i = 0; i < worker_count; i++) {
+		gb_thread_destroy(threads+i);
+	}
+
+	// gb_printf_err("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed));
+
+	global_procedure_body_in_worker_queue = false;
+}
 void add_untyped_expressions(CheckerInfo *cinfo, UntypedExprInfoMap *untyped) {
 	if (untyped == nullptr) {
 		return;

+ 4 - 1
src/checker.hpp

@@ -265,6 +265,7 @@ struct UntypedExprInfo {
 };
 
 typedef Map<ExprInfo *> UntypedExprInfoMap; // Key: Ast *
+typedef MPMCQueue<ProcInfo *> ProcBodyQueue;
 
 // CheckerInfo stores all the symbol information for a type-checked program
 struct CheckerInfo {
@@ -358,6 +359,8 @@ struct CheckerContext {
 	Scope *    polymorphic_scope;
 
 	Ast *assignment_lhs_hint;
+
+	ProcBodyQueue *procs_to_check_queue;
 };
 
 struct Checker {
@@ -368,7 +371,7 @@ struct Checker {
 
 	MPMCQueue<Entity *> procs_with_deferred_to_check;
 
-	MPMCQueue<ProcInfo *> procs_to_check_queue;
+	ProcBodyQueue procs_to_check_queue;
 	gbSemaphore procs_to_check_semaphore;
 
 	gbMutex poly_type_mutex;